From 22c537f003d27a9761ee66cafb83537d947fae2d Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Wed, 24 May 2017 15:56:00 -0400 Subject: [PATCH] wip --- lib/service.js | 41 ++++++++++++++ lib/services/block/index.js | 8 ++- lib/services/timestamp/index.js | 69 ++++++----------------- lib/services/transaction/index.js | 6 +- lib/services/utxo/index.js | 93 +++++++++++++++++++++++++++++++ 5 files changed, 161 insertions(+), 56 deletions(-) create mode 100644 lib/services/utxo/index.js diff --git a/lib/service.js b/lib/service.js index 08d0ef5a..59a36d13 100644 --- a/lib/service.js +++ b/lib/service.js @@ -2,6 +2,7 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; +var LRU = require('lru-cache'); var Service = function(options) { EventEmitter.call(this); @@ -86,4 +87,44 @@ Service.prototype.getRoutePrefix = function() { return this.name; }; +Service.prototype._createConcurrencyCache = function(opts) { + this._concurrencyCache = LRU(opts || 500); +}; + +Service.prototype._retrieveCachedItems = function(key, valueItem, prevKey, fn) { + var self = this; + + var prev = self._concurrencyCache.get(prevKey); + + if (prev && !prev.prevKey) { + + if (fn) { + valueItem = fn.call(self, valueItem, prev.valueItem); + } + + self._concurrencyCache.del(prevKey); + self._concurrencyCache.set(key, { valueItem: valueItem }); + return [{ key: key, value: valueItem }]; + } + + self._concurrencyCache.set(key, { valueItem: valueItem, prevKey: prevKey }); + + var resolvedDeps = []; + var depKey = key; + + self._concurrencyCache.rforEach(function(value, key) { + + if (depKey === value.prevKey) { + + valueItem = fn.call(self, value.valueItem, depKey.valueItem); + resolvedDeps.push({ key: key, value: valueItem }); + depKey = key; + self._concurrencyCache.del(key); + + } + }); + + return resolvedDeps; +}; + module.exports = Service; diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 6dfba20d..0b558f2a 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -113,7 +113,7 @@ BlockService.prototype.getBlockOperations = function(block, add, type, callback) fn = mod.concurrentBlockHandler; } - if(fn) { + if (fn) { fn.call(mod, block, add, function(err, ops) { @@ -128,10 +128,13 @@ BlockService.prototype.getBlockOperations = function(block, add, type, callback) next(); }); } else { + setImmediate(next); + } }, function(err) { + if (err) { return callback(err); } @@ -181,8 +184,7 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) { } var cachedBlock = self._rawBlockQueue.get(blockArg); - console.log(cachedBlock); - next(null); + next(null, cachedBlock); }, function(block, next) { diff --git a/lib/services/timestamp/index.js b/lib/services/timestamp/index.js index e27b8f3e..af912141 100644 --- a/lib/services/timestamp/index.js +++ b/lib/services/timestamp/index.js @@ -5,15 +5,13 @@ var BaseService = require('../../service'); var inherits = require('util').inherits; var LRU = require('lru-cache'); var utils = require('../../../lib/utils'); -var bitcore = require('bitcore-lib'); function TimestampService(options) { BaseService.call(this, options); this.currentBlock = null; this.currentTimestamp = null; - this._cache = LRU(50); - this._cache.set(new Array(65).join('0'), { time: 0 }); - this._setHandlers(); + this._createConcurrencyCache(); + this._concurrencyCache.set(new Array(65).join('0'), { valueItem: 0 }); } inherits(TimestampService, BaseService); @@ -41,55 +39,22 @@ TimestampService.prototype.stop = function(callback) { setImmediate(callback); }; -TimestampService.prototype._setHandlers = function() { - var self = this; -}; - -TimestampService.prototype._processBlockHandlerQueue = function(block) { - - var self = this; - - var blockTime = block.header.timestamp; - - var prevHash = utils.reverseBufferToString(block.header.prevHash); - - var prev = self._cache.get(prevHash); - - if (prev && !prev.prevHash) { - - if (blockTime <= prev.time) { - blockTime = prev.time + 1; - } - - self._cache.del(prevHash); - self._cache.set(block.hash, { time: blockTime }); - return [{ hash: block.hash, time: blockTime }]; - } - - self._cache.set(block.hash, { time: blockTime, prevHash: prevHash }); - - var additionalBlocks = []; - var dependentHash = block.hash; - - self._cache.rforEach(function(value, key) { - if (dependentHash === value.prevHash) { - additionalBlocks.push({ hash: key, time: value.time }); - dependentHash = value.prevHash; - self._cache.del(key); - } - }); - - return additionalBlocks; - -}; - -TimestampService.prototype.blockHandler = function(block, connectBlock, callback) { +TimestampService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) { var self = this; var action = connectBlock ? 'put' : 'del'; - var queue = self._processBlockHandlerQueue(block); + var filter = function(newBlockTime, prevBlockTime) { + if (newBlockTime <= prevBlockTime) { + return prevBlockTime + 1; + } + return newBlockTime; + }; + + var prevHash = utils.reverseBufferToString(block.header.prevHash); + var hash = block.hash; + var queue = self._retrieveCachedItems(hash, block.header.timestamp, prevHash, filter); var operations = []; @@ -103,13 +68,13 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback operations = operations.concat([ { type: action, - key: self.encoding.encodeTimestampBlockKey(item.time), - value: self.encoding.encodeTimestampBlockValue(item.hash) + key: self.encoding.encodeTimestampBlockKey(item.value), + value: self.encoding.encodeTimestampBlockValue(item.key) }, { type: action, - key: self.encoding.encodeBlockTimestampKey(item.hash), - value: self.encoding.encodeBlockTimestampValue(item.time) + key: self.encoding.encodeBlockTimestampKey(item.key), + value: self.encoding.encodeBlockTimestampValue(item.value) } ]); } diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 3e565496..aaa9cb33 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -63,6 +63,8 @@ TransactionService.prototype.blockHandler = function(block, connectBlock, callba async.series([ function(next) { + // if the timestamp service does not have our data yet, but might in the future, we don't want to let this hold us up + // self.node.services.timestamp.getTimestamp(block.hash, function(err, timestamp) { if(err) { return next(err); @@ -103,6 +105,7 @@ TransactionService.prototype.blockHandler = function(block, connectBlock, callba }); }; + TransactionService.prototype._getMissingInputValues = function(tx, callback) { var self = this; @@ -155,7 +158,8 @@ TransactionService.prototype._getInputValues = function(tx, callback) { TransactionService.prototype.getTransaction = function(txid, options, callback) { var self = this; - assert(txid.length === 64, 'Transaction, Txid: ' + txid + ' with length: ' + txid.length + ' does not resemble a txid.'); + assert(txid.length === 64, 'Transaction, Txid: ' + + txid + ' with length: ' + txid.length + ' does not resemble a txid.'); if(self.currentTransactions[txid]) { return setImmediate(function() { diff --git a/lib/services/utxo/index.js b/lib/services/utxo/index.js new file mode 100644 index 00000000..47b24164 --- /dev/null +++ b/lib/services/utxo/index.js @@ -0,0 +1,93 @@ +'use strict'; + +var BaseService = require('../../service'); +var inherits = require('util').inherits; +var Encoding = require('./encoding'); + +function UtxoService(options) { + BaseService.call(this, options); + this._createConcurrencyCache({ max: 500000, dispose: this._getUtxoOperations.bind(this) }); + this._operations = []; +} + +inherits(UtxoService, BaseService); + +UtxoService.dependencies = ['db']; + +UtxoService.prototype.start = function(callback) { + var self = this; + + self.db = this.node.services.db; + + self.db.getPrefix(self.name, function(err, prefix) { + if (err) { + return callback(err); + } + self.prefix = prefix; + self.encoding = new Encoding(self.prefix); + callback(); + }); +}; + +UtxoService.prototype.stop = function(callback) { + if (callback) { + setImmediate(callback); + } +}; + +UtxoService.prototype.concurrencyBlockHandler = function(block, connect, callback) { + + var self = this; + var reverseAction = connect ? 'del' : 'put'; + var action = connect ? 'put' : 'del'; + + for(var i = 0; i < block.transactions.length; i++) { + + var tx = block.transactions[i]; + var inputs = tx.inputs; + var outputs = tx.outputs; + var skipOutput = []; + + for(var j = 0; j < inputs.length; j++) { + var input = inputs[j]; + + if (tx.isCoinbase()) { + continue; + } + + if (input.prevHash === tx.hash) { + skipOutput.push(input.outputIndex); + continue; + } + + self._concurrencyCache.del(input.prevHash + input.outputTndex); + } + + for(var k = 0; k < inputs.length; k++) { + + if (skipOutput.indexOf(k) !== -1) { + continue; + } + + var output = outputs[k]; + self._concurrencyCache.set(tx.hash + k, { + output: output, + height: block.__height, + hash: block.hash + }); // key = 36 bytes, value = (8 + 25ish) + 36 = 69 bytes + } + } + + setImmediate(callback); + +}; + +UtxoService.prototype._getUtxoOperations = function(key, value) { + this._operations.push({ + action: 'put', + key: this._getKey(key), + value: this._getValue(value) + }); +}; + +module.exports = UtxoService;