diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 352a08b8..93b883c4 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -46,7 +46,8 @@ BlockService.prototype.getAPIMethods = function() { }; BlockService.prototype.getBestBlockHash = function(callback) { - callback(this._header.getAllHeaders().getLastIndex().hash); + var hash = this._header.getLastHeader().hash; + callback(null, hash); }; BlockService.prototype.getTip = function() { @@ -169,7 +170,8 @@ BlockService.prototype.subscribe = function(name, emitter) { }; BlockService.prototype._syncPercentage = function() { - var ratio = this._tip.height/this._header.getAllHeaders().size; + var height = this._header.getLastHeader().height; + var ratio = this._tip.height/height; return (ratio*100).toFixed(2); }; @@ -262,7 +264,7 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback } var commonAncestorHash = _newTip; - callback(null, hash, commonAncestorHash, oldBlocks); + callback(null, commonAncestorHash, oldBlocks); }); }; @@ -287,15 +289,26 @@ BlockService.prototype._getBlock = function(hash, callback) { }); }; -BlockService.prototype._getHash = function(blockArg) { - - var headers = this._header.getAllHeaders(); +BlockService.prototype._getHash = function(blockArg, callback) { if (utils.isHeight(blockArg)) { - return headers.getIndex(blockArg).hash; + + this._header.getHeaderByHeight(blockArg, function(err, header) { + + if(err) { + return callback(err); + } + + if (!header) { + return callback(); + } + + callback(null, header.hash); + }); + } - return blockArg; + return callback(null, blockArg); }; @@ -310,12 +323,12 @@ BlockService.prototype._handleReorg = function(hash, allHeaders) { log.warn('Block Service: Chain reorganization detected! Our current block tip is: ' + self._tip.hash + ' the current block: ' + hash + '.'); - self._findCommonAncestor(hash, allHeaders, function(err, newHash, commonAncestorHash, oldBlocks) { + self._findCommonAncestor(hash, allHeaders, function(err, commonAncestorHash, oldBlocks) { if (err) { log.error('Block Service: A common ancestor block between hash: ' + - self._tip.hash + ' (our current tip) and: ' + newHash + + self._tip.hash + ' (our current tip) and: ' + hash + ' (the forked block) could not be found. Bitcore-node must exit.'); self.node.stop(); @@ -408,7 +421,6 @@ BlockService.prototype._processBlock = function(block) { var self = this; var operations = []; var services = self.node.services; - var headers = self._header.getAllHeaders(); async.eachSeries( services, @@ -448,7 +460,7 @@ BlockService.prototype._processBlock = function(block) { return; } - self._tip.height = headers.get(block.rhash()).height; + self._tip.height = self._tip.height + 1; self._tip.hash = block.rhash(); var tipOps = utils.encodeTip(self._tip, self.name); @@ -494,6 +506,7 @@ BlockService.prototype._onBlock = function(block) { } log.debug('Block Service: new block: ' + block.rhash()); + block.height = this._tip.height + 1; this._processBlock(block); }; @@ -519,10 +532,7 @@ BlockService.prototype._setTip = function(tip) { BlockService.prototype._startSync = function() { - this._numNeeded = this._header.getAllHeaders().size - this._tip.height; - if (this._numNeeded <= 0) { - return; - } + this._numNeeded = this._header.getLastHeader().height - this._tip.height; log.info('Block Service: Gathering: ' + this._numNeeded + ' block(s) from the peer-to-peer network.'); @@ -546,37 +556,38 @@ BlockService.prototype._startSubscriptions = function() { BlockService.prototype._sync = function() { - if (this.node.stopping) { + var self = this; + + if (self.node.stopping) { return; } - var headers = this._header.getAllHeaders(); - var lastHeaderIndex = headers.size - 1; + var lastHeaderIndex = self._header.getLastHeader().height; - if (this._tip.height < lastHeaderIndex) { + if (self._tip.height < lastHeaderIndex) { - if (this._tip.height % 144 === 0) { + if (self._tip.height % 144 === 0) { log.info('Block Service: Blocks download progress: ' + - this._tip.height + '/' + headers.size + - ' (' + this._syncPercentage() + '%)'); + self._tip.height + '/' + lastHeaderIndex + + ' (' + self._syncPercentage() + '%)'); } - // the end should be our current tip height + 2 blocks, but if we only - // have one block to go, then the end should be 0, so we just get the last block - var endHash; - if (this._tip.height === lastHeaderIndex - 1) { - endHash = 0; - } else { - var end = headers.getIndex(this._tip.height + 2); - endHash = end ? end.hash : null; - } + return self._header.getNextHash(self._tip, function(err, hash) { + + if(err) { + log.error(err); + self.node.stop(); + return; + } + + self._p2p.getBlocks({ startHash: self._tip.hash, endHash: hash }); + }); - this._p2p.getBlocks({ startHash: this._tip.hash, endHash: endHash }); - return; } - log.info('Block Service: The best block hash is: ' + this._tip.hash + - ' at height: ' + this._tip.height); + this._header.blockServiceSyncing = false; + log.info('Block Service: The best block hash is: ' + self._tip.hash + + ' at height: ' + self._tip.height); }; diff --git a/lib/services/header/encoding.js b/lib/services/header/encoding.js index 219f9de7..e077f83f 100644 --- a/lib/services/header/encoding.js +++ b/lib/services/header/encoding.js @@ -3,24 +3,29 @@ function Encoding(servicePrefix) { this._servicePrefix = servicePrefix; + this._hashPrefix = new Buffer('00', 'hex'); + this._heightPrefix = new Buffer('01', 'hex'); } - // ---- hash --> header -Encoding.prototype.encodeHeaderKey = function(height, hash) { - var heightBuf = new Buffer(4); - heightBuf.writeUInt32BE(height); - var hashBuf = new Buffer(hash || new Array(65).join('0'), 'hex'); - return Buffer.concat([ this._servicePrefix, heightBuf, hashBuf ]); +Encoding.prototype.encodeHeaderHashKey = function(hash) { + var hashBuf = new Buffer(hash, 'hex'); + return Buffer.concat([ this._servicePrefix, this._hashPrefix, hashBuf ]); }; -Encoding.prototype.decodeHeaderKey = function(buffer) { - var height = buffer.readUInt32BE(2); - return { - height: height, - hash: buffer.slice(6).toString('hex') - }; +Encoding.prototype.decodeHeaderHashKey = function(buffer) { + return buffer.slice(3).toString('hex'); +}; +// ---- height --> header +Encoding.prototype.encodeHeaderHeightKey = function(height) { + var heightBuf = new Buffer(4); + heightBuf.writeUInt32BE(height); + return Buffer.concat([ this._servicePrefix, this._heightPrefix, heightBuf ]); +}; + +Encoding.prototype.decodeHeaderHeightKey = function(buffer) { + return buffer.readUInt32BE(3); }; Encoding.prototype.encodeHeaderValue = function(header) { diff --git a/lib/services/header/index.js b/lib/services/header/index.js index d29d509c..e5853b5e 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -20,13 +20,15 @@ var HeaderService = function(options) { this._tip = null; this._p2p = this.node.services.p2p; this._db = this.node.services.db; - this._headers = new utils.SimpleMap(); + this._hashes = []; this.subscriptions = {}; this.subscriptions.block = []; this._checkpoint = options.checkpoint || 2000; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; - this._initiallySynced = false; + this._lastHeader = null; + this.blockServiceSyncing = true; + }; inherits(HeaderService, BaseService); @@ -66,13 +68,49 @@ HeaderService.prototype.getAPIMethods = function() { }; +HeaderService.prototype.getAllHeaders = function(callback) { + + var self = this; + var start = self._encoding.encodeHeaderHeightKey(0); + var end = self._encoding.encodeHeaderHeightKey(self._tip.height + 1); + var allHeaders = new utils.SimpleMap(); + + var criteria = { + gte: start, + lt: end + }; + + var stream = self._db.createReadStream(criteria); + + var streamErr; + + stream.on('error', function(error) { + streamErr = error; + }); + + stream.on('data', function(data) { + var header = self._encoding.decodeHeaderValue(data.value); + allHeaders.set(header.hash, header, header.height); + }); + + stream.on('end', function() { + + if (streamErr) { + return streamErr; + } + + callback(null, allHeaders); + + }); +}; + HeaderService.prototype.getBlockHeader = function(arg, callback) { if (utils.isHeight(arg)) { - return callback(null, this._headers.getIndex(arg)); + return this._getHeader(arg, null, callback); } - return callback(null, this._headers.get(arg)); + return this._getHeader(null, arg, callback); }; @@ -116,18 +154,26 @@ HeaderService.prototype.start = function(callback) { merkleRoot: '4a5e1e4baab89f3a32518a88c31bc87f618f76673e2cc77ab2127b7afdeda33b' }; - self._headers.set(self.GENESIS_HASH, genesisHeader, 0); + self._lastHeader = genesisHeader; - self._db._store.put(self._encoding.encodeHeaderKey(0, self.GENESIS_HASH), - self._encoding.encodeHeaderValue(genesisHeader), next); - return; + var dbOps = [ + { + type: 'put', + key: self._encoding.encodeHeaderHeightKey(0), + value: self._encoding.encodeHeaderValue(genesisHeader) + }, + { + type: 'put', + key: self._encoding.encodeHeaderHashKey(self.GENESIS_HASH), + value: self._encoding.encodeHeaderValue(genesisHeader) + } + ]; + + return self._db.batch(dbOps, next); } - next(); + self._getLastHeader(next); }, - function(next) { - self._getPersistedHeaders(next); - } ], function(err) { if (err) { @@ -170,54 +216,83 @@ HeaderService.prototype.getPublishEvents = function() { HeaderService.prototype._onBlock = function(block) { - var hash = block.rhash(); - var header = this._headers.get(hash); + var self = this; + + var hash = block.rhash(); + var prevHash = bcoin.util.revHex(block.prevBlock); + var newBlock = prevHash === self._lastHeader.hash; + + if (newBlock) { - if (!header) { log.debug('Header Service: new block: ' + hash); - if (this._detectReorg()) { - this._handleReorg(); - return; - } + var header = block.toHeaders().toJSON(); - header = block.toHeaders().toJSON(); header.timestamp = header.ts; header.prevHash = header.prevBlock; - this._saveHeaders([this._onHeader(header)]); + + self._saveHeaders(self._onHeader(header)); + } + // this is the rare case that a block comes to us out of order or is a reorg'ed block + // in almost all cases, this will be a reorg + if (!newBlock && !self.blockServiceSyncing) { + + return self._detectReorg(block, function(err, reorg) { + + if (err) { + log.error(err); + self.node.stop(); + return; + } + + if (reorg) { + self._handleReorg(block); + return; + } + + self._broadcast(block); + + }); + + } + + setImmediate(function() { + self._broadcast(block); + }); + +}; + +HeaderService.prototype._broadcast = function(block) { for (var i = 0; i < this.subscriptions.block.length; i++) { - var prevHeader = this._headers.get(header.prevHash); - assert(prevHeader, 'We must have a previous header in order to calculate this block\'s data.'); - block.height = prevHeader.height + 1; - this.subscriptions.block[i].emit('header/block', block, header); + this.subscriptions.block[i].emit('header/block', block); } - }; HeaderService.prototype._onHeader = function(header) { - var prevHeader = this._headers.get(header.prevHash); - assert(prevHeader, 'We must have a previous header in order to calculate this header\'s data, current header is: ' + header.hash); + if (!header) { + return; + } - header.height = prevHeader.height + 1; - header.chainwork = this._getChainwork(header, prevHeader).toString(16, 64); + header.height = this._lastHeader.height + 1; + header.chainwork = this._getChainwork(header, this._lastHeader).toString(16, 64); - var newHdr = { - hash: header.hash, - prevHash: header.prevHash, - height: header.height, - chainwork: header.chainwork - }; + this._lastHeader = header; - this._headers.set(header.hash, newHdr, header.height); - - return { - type: 'put', - key: this._encoding.encodeHeaderKey(header.height, header.hash), - value: this._encoding.encodeHeaderValue(header) - }; + return [ + { + type: 'put', + key: this._encoding.encodeHeaderHashKey(header.hash), + value: this._encoding.encodeHeaderValue(header) + }, + { + type: 'put', + key: this._encoding.encodeHeaderHeightKey(header.height), + value: this._encoding.encodeHeaderValue(header) + } + ]; }; @@ -233,7 +308,9 @@ HeaderService.prototype._onHeaders = function(headers) { header = header.toObject(); - dbOps.push(this._onHeader(header)); + var ops = this._onHeader(header); + + dbOps = dbOps.concat(ops); this._tip.height = header.height; this._tip.hash = header.hash; @@ -258,36 +335,42 @@ HeaderService.prototype._saveHeaders = function(dbOps) { HeaderService.prototype._onHeadersSave = function(err) { - if(err) { + var self = this; + + if (err) { + log.error(err); + self.node.stop(); + return; + } + + if (!self._syncComplete()) { + + self._sync(); + return; + + } + + self._startBlockSubscription(); + + self._setBestHeader(); + + self._detectStartupReorg(function(err, reorg) { + + if (err) { log.error(err); - this.node.stop(); + self.node.stop(); return; } - if (!this._syncComplete()) { - - this._sync(); - return; - - } - - assert(!this._headers.hasNullItems(), 'Header list is not complete yet peer has sent all available headers.'); - - this._startBlockSubscription(); - - this._setBestHeader(); - - if (this._detectReorg()) { - this._handleReorg(); + if (reorg) { + self._handleReorg(); return; } - this._populateNextHashes(); + log.info('Header Service: emitting headers to block service.'); - log.debug('Header Service: emitting headers to block service.'); - this._populateNextHashes(); - - this.emit('headers', this._headers); + self.emit('headers'); + }); }; @@ -310,67 +393,119 @@ HeaderService.prototype._syncComplete = function() { }; - HeaderService.prototype._setBestHeader = function() { - var bestHeader = this._headers.getLastIndex(); + var bestHeader = this._lastHeader; this._tip.height = bestHeader.height; this._tip.hash = bestHeader.hash; log.debug('Header Service: ' + bestHeader.hash + ' is the best block hash.'); }; -HeaderService.prototype._populateNextHashes = function() { +HeaderService.prototype._getHeader = function(height, hash, callback) { - var count = 0; + var self = this; - while (count < this._headers.length) { - var hdr = this._headers.getIndex(count); - var nextHdr = this._headers.getIndex(++count); - if (nextHdr) { - hdr.nextHash = nextHdr.hash; - } + /*jshint -W018 */ + if (!hash && !(height >= 0)) { + /*jshint +W018 */ + return callback(new Error('invalid arguments')); } + + var key; + if (hash) { + key = self._encoding.encodeHeaderHashKey(hash); + } else { + key = self._encoding.encodeHeaderHeightKey(height); + } + + self._db.get(key, function(err, data) { + + if (err) { + return callback(err); + } + + if (!data) { + return callback(); + } + + callback(null, self._encoding.decodeHeaderValue(data)); + + }); + }; -HeaderService.prototype._detectReorg = function(block) { +HeaderService.prototype._detectReorg = function(block, callback) { - // this is a new block coming after we are sync'ed. - // for this not to be a reorg, this block's prev hash should be our tip - // in rare cases, we don't even have this block's parent in our collection - if (block) { - var prevHash = bcoin.util.revHex(block.prevBlock); - if (prevHash === this._tip.hash) { - return false; + assert(block, 'Block is needed to detect reorg.'); + +console.log(bcoin.util.revHex(block.prevBlock)); + var key = this._encoding.encodeHeaderHashKey(bcoin.util.revHex(block.prevBlock)); + + this._db.get(key, function(err, val) { + + if (err) { + return callback(err); } - var parentHeader = this._headers.get(prevHash); - if (!parentHeader) { - log.warn('Block with hash: ' + block.rhash() + - ' is not in our header set. This could be a block delievered out of order or this block\'s parent: ' + - prevHash + ' has been orphaned before we synced our headers the last time.'); - return false; + // is this block's prevHash already referenced in the database? If so, reorg + if (val) { + return callback(null, true); } - return true; - } - // is our original tip's height and hash the same after we rewound by the checkpoint amount of blocks - // and re-imported? If so, then we've reorg'ed since we've been shut down. - var headerHash = this._headers.getIndex(this._originalTip.height).hash; + callback(null, false); - assert(headerHash, 'Expected a header to exist at height ' + this._originalTip.height); - - if (this._originalTip.hash !== headerHash) { - return true; - } - - return false; + }); }; -HeaderService.prototype._handleReorg = function() { - this.emit('reorg', this._headers.getIndex(this._originalTip.height).hash, this._headers); +HeaderService.prototype._detectStartupReorg = function(callback) { + + var self = this; + + self._getHeader(self._originalTip.height, null, function(err, header) { + + if (err) { + return callback(err); + } + + if (!header) { + return callback(null, true); + } + + if (header.hash !== self._originalTip.hash) { + return callback(null, true); + } + + callback(null, false); + + }); + +}; + +HeaderService.prototype._handleReorg = function(block) { + + var self = this; + self.getAllHeaders(function(err, headers) { + + if (err || !headers) { + log.error(err || new Error('Missing headers')); + self.node.stop(); + return; + } + + var hash = headers.getIndex(self._originalTip.height).hash; + + if (block) { + hash = block.rhash(); + } + + assert(hash, 'To reorg, we need a hash to reorg to.'); + self.emit('reorg', hash, headers); + + }); + }; HeaderService.prototype._setListeners = function() { @@ -399,22 +534,65 @@ HeaderService.prototype._startSync = function() { HeaderService.prototype._sync = function() { - log.debug('Header Service: download progress: ' + this._tip.height + '/' + + log.info('Header Service: download progress: ' + this._tip.height + '/' + this._bestHeight + ' (' + (this._tip.height / this._bestHeight*100.00).toFixed(2) + '%)'); this._p2p.getHeaders({ startHash: this._tip.hash }); }; -HeaderService.prototype.getAllHeaders = function() { - return this._headers; -}; - -HeaderService.prototype._getPersistedHeaders = function(callback) { +// this gets the header that is +2 places from hash or returns 0 if there is no such +HeaderService.prototype.getNextHash = function(tip, callback) { var self = this; - var startingHeight = self._tip.height; + // if the tip being passed in is the second to last block, then return 0 because there isn't a block + // after the last block + if (tip.height + 1 === self._tip.height) { + return callback(null, 0); + } + + var start = self._encoding.encodeHeaderHeightKey(tip.height + 2); + var end = self._encoding.encodeHeaderHeightKey(tip.height + 3); + var result = 0; + + var criteria = { + gte: start, + lt: end + }; + + var stream = self._db.createReadStream(criteria); + + var streamErr; + + stream.on('error', function(error) { + streamErr = error; + }); + + stream.on('data', function(data) { + result = self._encoding.decodeHeaderValue(data.value).hash; + }); + + stream.on('end', function() { + + if (streamErr) { + return streamErr; + } + + callback(null, result); + + }); + +}; + +HeaderService.prototype.getLastHeader = function() { + assert(this._lastHeader, 'Last headers should be populated.'); + return this._lastHeader; +}; + +HeaderService.prototype._getLastHeader = function(callback) { + + var self = this; if (self._tip.height > self._checkpoint) { self._tip.height -= self._checkpoint; @@ -422,10 +600,10 @@ HeaderService.prototype._getPersistedHeaders = function(callback) { var removalOps = []; - var start = self._encoding.encodeHeaderKey(0); - var end = self._encoding.encodeHeaderKey(startingHeight + 1); + var start = self._encoding.encodeHeaderHeightKey(self._tip.height); + var end = self._encoding.encodeHeaderHeightKey(0xffffffff); - log.info('Getting persisted headers from genesis block to block ' + self._tip.height); + log.info('Getting last header synced at height: ' + self._tip.height); var criteria = { gte: start, @@ -450,17 +628,10 @@ HeaderService.prototype._getPersistedHeaders = function(callback) { key: data.key }); return; + } else if (header.height === self._tip.height) { + self._lastHeader = header; } - var newHdr = { - hash: header.hash, - prevHash: header.prevHash, - height: header.height, - chainwork: header.chainwork - }; - - self._headers.set(self._encoding.decodeHeaderKey(data.key).hash, newHdr, header.height); - }); stream.on('end', function() { @@ -469,8 +640,8 @@ HeaderService.prototype._getPersistedHeaders = function(callback) { return streamErr; } - var tipHeader = self._headers.getIndex(self._tip.height); - self._tip.hash = tipHeader.hash; + assert(self._lastHeader, 'The last synced header was not in the database.'); + self._tip.hash = self._lastHeader.hash; self._db.batch(removalOps, callback); }); diff --git a/lib/services/insight-api b/lib/services/insight-api new file mode 120000 index 00000000..368a038a --- /dev/null +++ b/lib/services/insight-api @@ -0,0 +1 @@ +/home/k/source/insight-api \ No newline at end of file diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 46d4c6ad..2a5800fd 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -111,6 +111,7 @@ MempoolService.prototype._startSubscriptions = function() { }; MempoolService.prototype.onBlock = function(block, callback) { + // remove this block's txs from mempool var self = this; var ops = block.txs.map(function(tx) { @@ -120,6 +121,7 @@ MempoolService.prototype.onBlock = function(block, callback) { }; }); callback(null, ops); + }; MempoolService.prototype._onTransaction = function(tx) { @@ -135,6 +137,4 @@ MempoolService.prototype.stop = function(callback) { callback(); }; - module.exports = MempoolService; - diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 5bbee3ca..a1b4ae4c 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -5,7 +5,6 @@ var inherits = require('util').inherits; var Encoding = require('./encoding'); var utils = require('../../utils'); var _ = require('lodash'); -var LRU = require('lru-cache'); var Unit = require('bitcore-lib').Unit; var log = require('../../index').log; var async = require('async'); @@ -14,11 +13,11 @@ var assert = require('assert'); function TransactionService(options) { BaseService.call(this, options); this._db = this.node.services.db; - this._mempool = this.node.services._mempool; + this._mempool = this.node.services.mempool; this._block = this.node.services.block; + this._header = this.node.services.header; this._p2p = this.node.services.p2p; this._timestamp = this.node.services.timestamp; - this._inputValuesCache = LRU(500000); // this should speed up syncing } inherits(TransactionService, BaseService); @@ -45,75 +44,6 @@ TransactionService.prototype.getDetailedTransaction = function(txid, options, ca this.getTransaction(txid, options, callback); }; -TransactionService.prototype._getTransaction = function(txid, options, callback) { - var self = this; - - var _tx; - var queryMempool = _.isUndefined(options.queryMempool) ? true : options.queryMempool; - - var key = self._encoding.encodeTransactionKey(txid); - - self._db.get(key, function(err, tx) { - - if(err) { - return callback(err); - } - - if (queryMempool && !tx) { - - self._mempool.getTransaction(tx, function(err, memTx) { - - if(err) { - return callback(err); - } - - if (memTx) { - memTx = self._encoding.decodeTransactionValue(memTx); - memTx.confirmations = 0; - _tx = memTx; - } - - }); - - } else { - - if (tx) { - tx = self._encoding.decodeTransactionValue(tx); - tx.confirmations = self._p2p.getBestHeight - tx.__height; - tx.blockHash = self._header.get(tx.__height).hash; - _tx = tx; - } - - } - - if (!_tx) { - return callback(); - } - - var outputSatoshis = 0; - _tx.outputs.forEach(function(output) { - outputSatoshis += output.value; - }); - _tx.outputs.forEach(function(output) { - outputSatoshis += output.value; - }); - _tx.outputSatoshis = outputSatoshis; - if (!_tx.inputs[0].isCoinbase()) { - var inputSatoshis = 0; - _tx.__inputValues.forEach(function(val) { - if (val >- 0) { - inputSatoshis += val; - } - }); - var feeSatoshis = inputSatoshis - outputSatoshis; - _tx.inputSatosbis = inputSatoshis; - _tx.feeSatosbis = feeSatoshis; - } - callback(null, _tx); - }); - -}; - TransactionService.prototype.getTransaction = function(txid, options, callback) { var self = this; @@ -138,6 +68,113 @@ TransactionService.prototype.getTransaction = function(txid, options, callback) }; +TransactionService.prototype._getTransaction = function(txid, options, callback) { + + var self = this; + + var queryMempool = _.isUndefined(options.queryMempool) ? true : options.queryMempool; + + var key = self._encoding.encodeTransactionKey(txid); + + async.waterfall([ + // main index? + function(next) { + self._db.get(key, function(err, tx) { + + if (err) { + return next(err); + } + + if (!tx) { + return next(null, tx); + } + + tx = self._encoding.decodeTransactionValue(tx); + tx.confirmations = self._header.getBestHeight() - tx.__height; + + self._header.getBlockHeader(tx.__height, function(err, header) { + + if (err) { + return next(err); + } + + if (header) { + tx.blockHash = header.hash; + } + + next(null, tx); + + }); + }); + }, + // mempool? + function(tx, next) { + + if (queryMempool && !tx) { + + return self._mempool.getMempoolTransaction(txid, function(err, memTx) { + + if (err) { + return next(err); + } + + if (memTx) { + memTx = self._encoding.decodeTransactionValue(memTx); + memTx.confirmations = 0; + } + + next(null, memTx); + }); + + } + + next(null, tx); + + } + ], function(err, tx) { + + if (err) { + return callback(err); + } + + if (!tx) { + return callback(); + } + + // output values + var outputSatoshis = 0; + + tx.outputs.forEach(function(output) { + outputSatoshis += output.value; + }); + + tx.outputSatoshis = outputSatoshis; + + + //input values + if (!tx.inputs[0].isCoinbase()) { + + var inputSatoshis = 0; + + tx.__inputValues.forEach(function(val) { + + if (val >- 0) { + inputSatoshis += val; + } + }); + + var feeSatoshis = inputSatoshis - outputSatoshis; + tx.inputSatosbis = inputSatoshis; + tx.feeSatosbis = feeSatoshis; + + } + + callback(null, tx); + + }); + +}; + TransactionService.prototype._addMissingInputValues = function(tx, options, callback) { // if we have cache misses from when we populated input values, @@ -147,11 +184,12 @@ TransactionService.prototype._addMissingInputValues = function(tx, options, call var inputSatoshis = tx.__inputValues[index]; - if (inputSatoshis >= 0) { + if (inputSatoshis >= 0 || input.isCoinbase()) { return next(); } var outputIndex = input.prevout.index; + self._getTransaction(input.prevout.txid(), options, function(err, _tx) { if (err || !_tx) { @@ -171,7 +209,18 @@ TransactionService.prototype._addMissingInputValues = function(tx, options, call return callback(err); } - callback(null, tx); + var key = self._encoding.encodeTransactionKey(tx.txid()); + var value = self._encoding.encodeTransactionValue(tx); + + self._db.put(key, value, function(err) { + + if (err) { + return callback(err); + } + + callback(null, tx); + + }); }); }; @@ -211,33 +260,10 @@ TransactionService.prototype.stop = function(callback) { }; // --- start private prototype functions -TransactionService.prototype._cacheOutputValues = function(tx) { - - var values = tx.outputs.map(function(output) { - return Unit.fromBTC(output.value).toSatoshis(); - }); - - this._inputValuesCache.set(tx.txid(), values); - -}; - TransactionService.prototype._getBlockTimestamp = function(hash) { return this._timestamp.getTimestampSync(hash); }; -TransactionService.prototype._getInputValues = function(tx) { - - var self = this; - - return tx.inputs.map(function(input) { - var value = self._inputValuesCache.get(input.prevout.txid()); - if (value) { - return value[input.prevout.index]; - } - return null; - }); -}; - TransactionService.prototype.onBlock = function(block, callback) { var self = this; @@ -250,9 +276,7 @@ TransactionService.prototype.onBlock = function(block, callback) { return self._processTransaction(tx, { block: block }); }); - setImmediate(function() { - callback(null, operations); - }); + callback(null, operations); }; @@ -296,20 +320,20 @@ TransactionService.prototype._onReorg = function(commonAncestorHeader, oldBlockL TransactionService.prototype._processTransaction = function(tx, opts) { - // squirrel away he current outputs - this._cacheOutputValues(tx); - // this index is very simple txid -> tx, but we also need to find each // input's prev output value, the adjusted timestamp for the block and // the tx's block height // input values - tx.__inputValues = this._getInputValues(tx); //if there are any nulls here, this is a cache miss + tx.__inputValues = []; // these are lazy-loaded on the first access of the tx - //timestamp + // timestamp tx.__timestamp = this._getBlockTimestamp(opts.block.rhash()); - //height + assert(tx.__timestamp, 'Timestamp is required when saving a transaction.'); + + // height tx.__height = opts.block.height; + assert(tx.__height, 'Block height is required when saving a trasnaction.'); return { key: this._encoding.encodeTransactionKey(tx.txid()), @@ -330,7 +354,6 @@ TransactionService.prototype._startSubscriptions = function() { } this._bus.on('block/reorg', this._onReorg.bind(this)); - this._bus.subscribe('block/reorg'); }; diff --git a/package.json b/package.json index a2e52efc..fa754b02 100644 --- a/package.json +++ b/package.json @@ -63,6 +63,7 @@ "liftoff": "^2.2.0", "lodash": "^4.17.4", "lru-cache": "^4.0.2", + "memwatch-next": "^0.3.0", "mkdirp": "0.5.0", "path-is-absolute": "^1.0.0", "socket.io": "^1.4.5",