Fix: Incorrect values in cache during reorg
- store the blockhash of lastTx in cache value - If last cached tx-block was removed (during reorg), then delete the cache and recalculate the values.
This commit is contained in:
parent
d42e569008
commit
60289a644b
@ -123,7 +123,7 @@ Encoding.prototype.decodeAddressCacheKey = function(buffer) {
|
||||
return buffer.slice(3).toString('utf8');
|
||||
}
|
||||
|
||||
Encoding.prototype.encodeAddressCacheValue = function(lastTx, balance, received, sent, txApperances) {
|
||||
Encoding.prototype.encodeAddressCacheValue = function(lastTx, lastBlock, balance, received, sent, txApperances) {
|
||||
|
||||
var buffer = [];
|
||||
|
||||
@ -146,6 +146,9 @@ Encoding.prototype.encodeAddressCacheValue = function(lastTx, balance, received,
|
||||
var txidBuffer = new Buffer(lastTx);
|
||||
buffer.push(txidBuffer);
|
||||
|
||||
var blkBuffer = new Buffer(lastBlock);
|
||||
buffer.push(blkBuffer);
|
||||
|
||||
return Buffer.concat(buffer);
|
||||
}
|
||||
|
||||
@ -155,9 +158,9 @@ Encoding.prototype.decodeAddressCacheValue = function(buffer) {
|
||||
var received = parseInt(buffer.readBigUInt64BE(8));
|
||||
var sent = parseInt(buffer.readBigUInt64BE(16));
|
||||
var txApperances = buffer.readUInt32BE(24);
|
||||
var lastTx = buffer.slice(28).toString('hex');
|
||||
|
||||
return { lastTx, balance, received, sent, txApperances };
|
||||
var lastTx = buffer.slice(28, 60).toString('hex'); //28 + 32 (tx hash buffer length) = 60
|
||||
var lastBlock = buffer.slice(60).toString('hex');
|
||||
return { lastTx, lastBlock, balance, received, sent, txApperances };
|
||||
}
|
||||
|
||||
module.exports = Encoding;
|
||||
|
||||
@ -343,6 +343,7 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer
|
||||
};
|
||||
|
||||
var useCache = _.isUndefined(options.after);
|
||||
var lastTx, lastBlock;
|
||||
|
||||
self._loadCache(address, result, useCache, function(err, lastCachedTx) {
|
||||
if(err)
|
||||
@ -360,8 +361,10 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer
|
||||
count++;
|
||||
self._aggregateAddressSummaryResult(tx, address, result, options);
|
||||
|
||||
if(tx.confirmations)
|
||||
result.lastItem = tx.txid();
|
||||
if(tx.confirmations) {
|
||||
lastTx = tx.txid();
|
||||
lastBlock = tx.blockhash;
|
||||
}
|
||||
}
|
||||
|
||||
if(count >= MAX_TX_QUERY_LIMIT_SUMMARY) {//stop quering db when limit reached
|
||||
@ -389,15 +392,17 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer
|
||||
result.totalSent = Unit.fromSatoshis(result.totalSentSat).toBTC();
|
||||
result.unconfirmedBalance = Unit.fromSatoshis(result.unconfirmedBalanceSat).toBTC();
|
||||
|
||||
result.lastItem = lastTx;
|
||||
|
||||
callback(null, result);
|
||||
|
||||
//store in cache if needed
|
||||
if(useCache) {
|
||||
if(result.incomplete) //full summary needs to be calculated in background
|
||||
self._cacheSummaryInBackground(address, result.lastItem, result);
|
||||
else if (!_.isUndefined(lastCachedTx) && !_.isUndefined(result.lastItem)
|
||||
&& result.lastItem != lastCachedTx && !self._cacheInstance.has(address)) //update cache if needed
|
||||
self._storeCache(address, result.lastItem, result);
|
||||
self._cacheSummaryInBackground(address, lastTx, lastBlock, result);
|
||||
else if (!_.isUndefined(lastCachedTx) && !_.isUndefined(lastTx)
|
||||
&& lastTx != lastCachedTx && !self._cacheInstance.has(address)) //update cache if needed
|
||||
self._storeCache(address, lastTx, lastBlock, result);
|
||||
}
|
||||
|
||||
});
|
||||
@ -407,7 +412,7 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer
|
||||
}
|
||||
|
||||
AddressService.prototype._cacheInstance = new Set();
|
||||
AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, result){
|
||||
AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, lastBlock, result){
|
||||
const self = this;
|
||||
|
||||
if(self._cacheInstance.has(address))
|
||||
@ -424,7 +429,6 @@ AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, r
|
||||
unconfirmedTxApperances: 0
|
||||
};
|
||||
const options = { queryMempool: false, after: lastTx, noTxList: true };
|
||||
var lastItem;
|
||||
|
||||
self._streamAddressSummary(address, options, function(err, tx) {
|
||||
|
||||
@ -433,8 +437,10 @@ AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, r
|
||||
|
||||
if(tx) {
|
||||
self._aggregateAddressSummaryResult(tx, address, cache, options);
|
||||
if(tx.confirmations)
|
||||
lastItem = tx.txid();
|
||||
if(tx.confirmations){
|
||||
lastTx = tx.txid();
|
||||
lastBlock = tx.blockhash;
|
||||
}
|
||||
}
|
||||
|
||||
}, function(err) {
|
||||
@ -447,8 +453,8 @@ AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, r
|
||||
cache.totalSentSat = parseInt(cache.totalSentSat.toFixed());
|
||||
cache.txApperances = parseInt(cache.txApperances.toFixed());
|
||||
|
||||
if(!_.isUndefined(lastItem))
|
||||
self._storeCache(address, lastItem, cache);
|
||||
if(!_.isUndefined(lastTx))
|
||||
self._storeCache(address, lastTx, lastBlock, cache);
|
||||
|
||||
self._cacheInstance.delete(address); //remove from running instance
|
||||
|
||||
@ -456,10 +462,10 @@ AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, r
|
||||
|
||||
}
|
||||
|
||||
AddressService.prototype._storeCache = function(address, lastCacheTx, result, callback) {
|
||||
AddressService.prototype._storeCache = function(address, lastCacheTx, lastCacheBlock, result, callback) {
|
||||
const self = this;
|
||||
var key = self._encoding.encodeAddressCacheKey(address);
|
||||
var value = self._encoding.encodeAddressCacheValue(lastCacheTx, result.balanceSat, result.totalReceivedSat, result.totalSentSat, result.txApperances)
|
||||
var value = self._encoding.encodeAddressCacheValue(lastCacheTx, lastCacheBlock, result.balanceSat, result.totalReceivedSat, result.totalSentSat, result.txApperances)
|
||||
|
||||
if(!_.isFunction(callback)) //if callback is not passed, call a empty function
|
||||
callback = () => null;
|
||||
@ -467,30 +473,50 @@ AddressService.prototype._storeCache = function(address, lastCacheTx, result, ca
|
||||
self._db.put(key, value, callback);
|
||||
}
|
||||
|
||||
AddressService.prototype._loadCache = function(address, result, useCache, next) {
|
||||
AddressService.prototype._loadCache = function(address, result, useCache, callback) {
|
||||
const self = this;
|
||||
|
||||
if(!useCache) //skip if useCache is false (cases like 'after' parameter is used by client)
|
||||
next();
|
||||
callback();
|
||||
|
||||
var key = self._encoding.encodeAddressCacheKey(address);
|
||||
self._db.get(key, function(err, value) {
|
||||
|
||||
if (err) {
|
||||
return next(err);
|
||||
return callback(err);
|
||||
}
|
||||
if (!value) {
|
||||
return next();
|
||||
return callback();
|
||||
}
|
||||
|
||||
var addressCache = self._encoding.decodeAddressCacheValue(value);
|
||||
//values are in satoshis
|
||||
result.balanceSat = addressCache.balance;
|
||||
result.totalReceivedSat = addressCache.received;
|
||||
result.totalSentSat = addressCache.sent;
|
||||
result.txApperances = addressCache.txApperances;
|
||||
|
||||
next(null, addressCache.lastTx);
|
||||
|
||||
var lastCacheTx = addressCache.lastTx, lastCacheBlock = addressCache.lastBlock
|
||||
|
||||
self._block.getBlock(lastCacheBlock, function(err, block) {
|
||||
|
||||
if(err) {
|
||||
return callback(err);
|
||||
}
|
||||
|
||||
if (!block) { //block not found, probably removed in reorg.
|
||||
//delete the existing cache and recalc values freshly
|
||||
self._deleteCache(address, function() {
|
||||
callback();
|
||||
});
|
||||
|
||||
} else {
|
||||
|
||||
//values are in satoshis
|
||||
result.balanceSat = addressCache.balance;
|
||||
result.totalReceivedSat = addressCache.received;
|
||||
result.totalSentSat = addressCache.sent;
|
||||
result.txApperances = addressCache.txApperances;
|
||||
|
||||
callback(null, lastCacheTx);
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user