This commit is contained in:
Chris Kleeschulte 2017-05-24 15:56:00 -04:00
parent b51179274f
commit 22c537f003
5 changed files with 161 additions and 56 deletions

View File

@ -2,6 +2,7 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var LRU = require('lru-cache');
var Service = function(options) {
EventEmitter.call(this);
@ -86,4 +87,44 @@ Service.prototype.getRoutePrefix = function() {
return this.name;
};
Service.prototype._createConcurrencyCache = function(opts) {
this._concurrencyCache = LRU(opts || 500);
};
Service.prototype._retrieveCachedItems = function(key, valueItem, prevKey, fn) {
var self = this;
var prev = self._concurrencyCache.get(prevKey);
if (prev && !prev.prevKey) {
if (fn) {
valueItem = fn.call(self, valueItem, prev.valueItem);
}
self._concurrencyCache.del(prevKey);
self._concurrencyCache.set(key, { valueItem: valueItem });
return [{ key: key, value: valueItem }];
}
self._concurrencyCache.set(key, { valueItem: valueItem, prevKey: prevKey });
var resolvedDeps = [];
var depKey = key;
self._concurrencyCache.rforEach(function(value, key) {
if (depKey === value.prevKey) {
valueItem = fn.call(self, value.valueItem, depKey.valueItem);
resolvedDeps.push({ key: key, value: valueItem });
depKey = key;
self._concurrencyCache.del(key);
}
});
return resolvedDeps;
};
module.exports = Service;

View File

@ -113,7 +113,7 @@ BlockService.prototype.getBlockOperations = function(block, add, type, callback)
fn = mod.concurrentBlockHandler;
}
if(fn) {
if (fn) {
fn.call(mod, block, add, function(err, ops) {
@ -128,10 +128,13 @@ BlockService.prototype.getBlockOperations = function(block, add, type, callback)
next();
});
} else {
setImmediate(next);
}
},
function(err) {
if (err) {
return callback(err);
}
@ -181,8 +184,7 @@ BlockService.prototype.getBlocks = function(blockArgs, callback) {
}
var cachedBlock = self._rawBlockQueue.get(blockArg);
console.log(cachedBlock);
next(null);
next(null, cachedBlock);
},
function(block, next) {

View File

@ -5,15 +5,13 @@ var BaseService = require('../../service');
var inherits = require('util').inherits;
var LRU = require('lru-cache');
var utils = require('../../../lib/utils');
var bitcore = require('bitcore-lib');
function TimestampService(options) {
BaseService.call(this, options);
this.currentBlock = null;
this.currentTimestamp = null;
this._cache = LRU(50);
this._cache.set(new Array(65).join('0'), { time: 0 });
this._setHandlers();
this._createConcurrencyCache();
this._concurrencyCache.set(new Array(65).join('0'), { valueItem: 0 });
}
inherits(TimestampService, BaseService);
@ -41,55 +39,22 @@ TimestampService.prototype.stop = function(callback) {
setImmediate(callback);
};
TimestampService.prototype._setHandlers = function() {
var self = this;
};
TimestampService.prototype._processBlockHandlerQueue = function(block) {
var self = this;
var blockTime = block.header.timestamp;
var prevHash = utils.reverseBufferToString(block.header.prevHash);
var prev = self._cache.get(prevHash);
if (prev && !prev.prevHash) {
if (blockTime <= prev.time) {
blockTime = prev.time + 1;
}
self._cache.del(prevHash);
self._cache.set(block.hash, { time: blockTime });
return [{ hash: block.hash, time: blockTime }];
}
self._cache.set(block.hash, { time: blockTime, prevHash: prevHash });
var additionalBlocks = [];
var dependentHash = block.hash;
self._cache.rforEach(function(value, key) {
if (dependentHash === value.prevHash) {
additionalBlocks.push({ hash: key, time: value.time });
dependentHash = value.prevHash;
self._cache.del(key);
}
});
return additionalBlocks;
};
TimestampService.prototype.blockHandler = function(block, connectBlock, callback) {
TimestampService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) {
var self = this;
var action = connectBlock ? 'put' : 'del';
var queue = self._processBlockHandlerQueue(block);
var filter = function(newBlockTime, prevBlockTime) {
if (newBlockTime <= prevBlockTime) {
return prevBlockTime + 1;
}
return newBlockTime;
};
var prevHash = utils.reverseBufferToString(block.header.prevHash);
var hash = block.hash;
var queue = self._retrieveCachedItems(hash, block.header.timestamp, prevHash, filter);
var operations = [];
@ -103,13 +68,13 @@ TimestampService.prototype.blockHandler = function(block, connectBlock, callback
operations = operations.concat([
{
type: action,
key: self.encoding.encodeTimestampBlockKey(item.time),
value: self.encoding.encodeTimestampBlockValue(item.hash)
key: self.encoding.encodeTimestampBlockKey(item.value),
value: self.encoding.encodeTimestampBlockValue(item.key)
},
{
type: action,
key: self.encoding.encodeBlockTimestampKey(item.hash),
value: self.encoding.encodeBlockTimestampValue(item.time)
key: self.encoding.encodeBlockTimestampKey(item.key),
value: self.encoding.encodeBlockTimestampValue(item.value)
}
]);
}

View File

@ -63,6 +63,8 @@ TransactionService.prototype.blockHandler = function(block, connectBlock, callba
async.series([
function(next) {
// if the timestamp service does not have our data yet, but might in the future, we don't want to let this hold us up
//
self.node.services.timestamp.getTimestamp(block.hash, function(err, timestamp) {
if(err) {
return next(err);
@ -103,6 +105,7 @@ TransactionService.prototype.blockHandler = function(block, connectBlock, callba
});
};
TransactionService.prototype._getMissingInputValues = function(tx, callback) {
var self = this;
@ -155,7 +158,8 @@ TransactionService.prototype._getInputValues = function(tx, callback) {
TransactionService.prototype.getTransaction = function(txid, options, callback) {
var self = this;
assert(txid.length === 64, 'Transaction, Txid: ' + txid + ' with length: ' + txid.length + ' does not resemble a txid.');
assert(txid.length === 64, 'Transaction, Txid: ' +
txid + ' with length: ' + txid.length + ' does not resemble a txid.');
if(self.currentTransactions[txid]) {
return setImmediate(function() {

View File

@ -0,0 +1,93 @@
'use strict';
var BaseService = require('../../service');
var inherits = require('util').inherits;
var Encoding = require('./encoding');
function UtxoService(options) {
BaseService.call(this, options);
this._createConcurrencyCache({ max: 500000, dispose: this._getUtxoOperations.bind(this) });
this._operations = [];
}
inherits(UtxoService, BaseService);
UtxoService.dependencies = ['db'];
UtxoService.prototype.start = function(callback) {
var self = this;
self.db = this.node.services.db;
self.db.getPrefix(self.name, function(err, prefix) {
if (err) {
return callback(err);
}
self.prefix = prefix;
self.encoding = new Encoding(self.prefix);
callback();
});
};
UtxoService.prototype.stop = function(callback) {
if (callback) {
setImmediate(callback);
}
};
UtxoService.prototype.concurrencyBlockHandler = function(block, connect, callback) {
var self = this;
var reverseAction = connect ? 'del' : 'put';
var action = connect ? 'put' : 'del';
for(var i = 0; i < block.transactions.length; i++) {
var tx = block.transactions[i];
var inputs = tx.inputs;
var outputs = tx.outputs;
var skipOutput = [];
for(var j = 0; j < inputs.length; j++) {
var input = inputs[j];
if (tx.isCoinbase()) {
continue;
}
if (input.prevHash === tx.hash) {
skipOutput.push(input.outputIndex);
continue;
}
self._concurrencyCache.del(input.prevHash + input.outputTndex);
}
for(var k = 0; k < inputs.length; k++) {
if (skipOutput.indexOf(k) !== -1) {
continue;
}
var output = outputs[k];
self._concurrencyCache.set(tx.hash + k, {
output: output,
height: block.__height,
hash: block.hash
}); // key = 36 bytes, value = (8 + 25ish) + 36 = 69 bytes
}
}
setImmediate(callback);
};
UtxoService.prototype._getUtxoOperations = function(key, value) {
this._operations.push({
action: 'put',
key: this._getKey(key),
value: this._getValue(value)
});
};
module.exports = UtxoService;