Compare commits

...

10 Commits

Author SHA1 Message Date
sairajzero
8121a2ba96 Bug fix
- Fixed: mempool (unconfirmed tx) getting repeated in chain query when using before option
2023-04-27 04:15:43 +05:30
sairajzero
31344d770e Adding 'before' option to address APIs
- Similar to 'after' option but inverse of it. ie, if before txid is passed, then query list/values before the given txid
- 'before' and 'after' option can be use in combination or individually
- cache system for address summary api wont be used with 'after' and/or 'before' option is used
2023-04-27 03:48:09 +05:30
sairajzero
96677310c2 Bug fixes 2023-04-27 01:41:22 +05:30
sairajzero
e180672583 Adding reverse option to address history query
- Using option `reverse` as true will query the latest 1000 (max val) tx instead of the 1st 1000 tx.
2023-04-23 02:55:49 +05:30
sairajzero
158d5aefc2 Decrease MAX_TX_QUERY_LIMIT_SUMMARY to 500
- Summary cache is triggered for addresses with more than 500 tx
2023-04-22 23:55:11 +05:30
sairajzero
e7248320a6 bug fix: txid & blockhash not cached correctly 2023-04-22 22:19:23 +05:30
sairajzero
60289a644b Fix: Incorrect values in cache during reorg
- store the blockhash of lastTx in cache value
- If last cached tx-block was removed (during reorg), then delete the cache and recalculate the values.
2023-04-21 22:51:45 +05:30
sairajzero
d42e569008 update address-cache key prefix 2023-04-19 03:03:21 +05:30
sairajzero
6beb2ecd06 Adding function to delete cache for a given addr
Adding _deleteCache: deletes the cache for given address (useful for dev purposes or to recalculate caches)
2023-04-19 02:31:04 +05:30
sairajzero
4d023760ad Adding del (delete) function in db service 2023-04-19 02:28:44 +05:30
3 changed files with 185 additions and 90 deletions

View File

@ -4,7 +4,7 @@ function Encoding(servicePrefix) {
this.servicePrefix = servicePrefix;
this.addressIndex = new Buffer('00', 'hex');
this.utxoIndex = new Buffer('01', 'hex');
this.addressCache = new Buffer('ff', 'hex');
this.addressCache = new Buffer('fe', 'hex');
}
Encoding.prototype.encodeAddressIndexKey = function(address, height, txid, index, input, timestamp) {
@ -123,7 +123,7 @@ Encoding.prototype.decodeAddressCacheKey = function(buffer) {
return buffer.slice(3).toString('utf8');
}
Encoding.prototype.encodeAddressCacheValue = function(lastTx, balance, received, sent, txApperances) {
Encoding.prototype.encodeAddressCacheValue = function(lastTx, lastBlock, balance, received, sent, txApperances) {
var buffer = [];
@ -143,9 +143,12 @@ Encoding.prototype.encodeAddressCacheValue = function(lastTx, balance, received,
txApperancesBuffer.writeUInt32BE(txApperances);
buffer.push(txApperancesBuffer);
var txidBuffer = new Buffer(lastTx);
var txidBuffer = new Buffer(lastTx, 'hex');
buffer.push(txidBuffer);
var blkBuffer = new Buffer(lastBlock, 'hex');
buffer.push(blkBuffer);
return Buffer.concat(buffer);
}
@ -155,9 +158,9 @@ Encoding.prototype.decodeAddressCacheValue = function(buffer) {
var received = parseInt(buffer.readBigUInt64BE(8));
var sent = parseInt(buffer.readBigUInt64BE(16));
var txApperances = buffer.readUInt32BE(24);
var lastTx = buffer.slice(28).toString('hex');
return { lastTx, balance, received, sent, txApperances };
var lastTx = buffer.slice(28, 60).toString('hex'); //28 + 32 (tx hash buffer length) = 60
var lastBlock = buffer.slice(60).toString('hex');
return { lastTx, lastBlock, balance, received, sent, txApperances };
}
module.exports = Encoding;

View File

@ -18,7 +18,7 @@ var XXHash = require('xxhash');
const MAX_TX_QUERY_LIMIT_HISTORY = 1000;
const MAX_TX_QUERY_LIMIT_UTXO = 1000;
const MAX_TX_QUERY_LIMIT_SUMMARY = 5000;
const MAX_TX_QUERY_LIMIT_SUMMARY = 500;
// See rationale about this cache at function getTxList(next)
const TXID_LIST_CACHE_ITEMS = 250; // nr of items (this translates to: consecutive
@ -179,8 +179,8 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream
var self = this;
options = options || {};
//options.from = options.from || 0; //Deprecated, use `after` option
//options.to = options.to || 0xffffffff; //Deprecated, use `after` option
//options.from = options.from || 0; //Deprecated, use `after` and `before` option
//options.to = options.to || 0xffffffff; //Deprecated, use `after` and `before` option
if(!_.isFunction(callback)){ //if only 3 args, then streamer is callback
callback = streamer;
@ -195,9 +195,13 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream
options.mempoolOnly = false;
}
if(_.isUndefined(options.reverse)) {
options.reverse = false;
}
var old_support = false;
//Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use)
if( !_.isUndefined(options.from) || !_.isUndefined(options.to)) {
if(!_.isUndefined(options.from) || !_.isUndefined(options.to)) {
old_support = true;
options.from = options.from || 0;
options.to = options.to || 0xffffffff; //Max value of to will actually be MAX_TX_QUERY_LIMIT_HISTORY
@ -223,13 +227,18 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream
addr_count++;
if(!results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array
results.items.unshift(tx); //using unshift, so that recent tx (low) are at front
if(!results.items.some(x => x.txid() === tx.txid())) {//add only if tx not already in array
if(!options.reverse)
results.items.unshift(tx); //using unshift, so that recent tx (low) are at front
else
results.items.push(tx);
}
if(results.items.length > MAX_TX_QUERY_LIMIT_HISTORY) { //remove items from array when overflown
results.items.sort((a, b) => (b.__height || 0xffffffff) - (a.__height || 0xffffffff) || b.txid().localeCompare(a.txid()));
let del_count = results.items.length - MAX_TX_QUERY_LIMIT_HISTORY;
let start_index = old_support ? MAX_TX_QUERY_LIMIT_HISTORY : 0;
let start_index = (old_support || options.reverse) ? MAX_TX_QUERY_LIMIT_HISTORY : 0;
results.items.splice(start_index, del_count);
results.incomplete = true;
@ -314,8 +323,8 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer
var self = this;
options = options || {};
//options.from = options.from || 0; //Deprecated
//options.to = options.to || 0xffffffff; //Deprecated
//options.from = options.from || 0; //Deprecated, use `after` and `before` option
//options.to = options.to || 0xffffffff; //Deprecated, use `after` and `before` option
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
@ -342,7 +351,8 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer
txApperances: 0,
};
var useCache = _.isUndefined(options.after);
var useCache = _.isUndefined(options.after) && _.isUndefined(options.before);
var lastTx, lastBlock;
self._loadCache(address, result, useCache, function(err, lastCachedTx) {
if(err)
@ -360,8 +370,10 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer
count++;
self._aggregateAddressSummaryResult(tx, address, result, options);
if(tx.confirmations)
result.lastItem = tx.txid();
if(tx.confirmations) {
lastTx = tx.txid();
lastBlock = tx.blockhash;
}
}
if(count >= MAX_TX_QUERY_LIMIT_SUMMARY) {//stop quering db when limit reached
@ -389,15 +401,17 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer
result.totalSent = Unit.fromSatoshis(result.totalSentSat).toBTC();
result.unconfirmedBalance = Unit.fromSatoshis(result.unconfirmedBalanceSat).toBTC();
result.lastItem = lastTx;
callback(null, result);
//store in cache if needed
if(useCache) {
if(result.incomplete) //full summary needs to be calculated in background
self._cacheSummaryInBackground(address, result.lastItem, result);
else if (!_.isUndefined(lastCachedTx) && !_.isUndefined(result.lastItem)
&& result.lastItem != lastCachedTx && !self._cacheInstance.has(address)) //update cache if needed
self._storeCache(address, result.lastItem, result);
self._cacheSummaryInBackground(address, lastTx, lastBlock, result);
else if (!_.isUndefined(lastCachedTx) && !_.isUndefined(lastTx)
&& lastTx != lastCachedTx && !self._cacheInstance.has(address)) //update cache if needed
self._storeCache(address, lastTx, lastBlock, result);
}
});
@ -407,7 +421,7 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer
}
AddressService.prototype._cacheInstance = new Set();
AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, result){
AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, lastBlock, result){
const self = this;
if(self._cacheInstance.has(address))
@ -424,7 +438,6 @@ AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, r
unconfirmedTxApperances: 0
};
const options = { queryMempool: false, after: lastTx, noTxList: true };
var lastItem;
self._streamAddressSummary(address, options, function(err, tx) {
@ -433,8 +446,10 @@ AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, r
if(tx) {
self._aggregateAddressSummaryResult(tx, address, cache, options);
if(tx.confirmations)
lastItem = tx.txid();
if(tx.confirmations){
lastTx = tx.txid();
lastBlock = tx.blockhash;
}
}
}, function(err) {
@ -447,8 +462,8 @@ AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, r
cache.totalSentSat = parseInt(cache.totalSentSat.toFixed());
cache.txApperances = parseInt(cache.txApperances.toFixed());
if(!_.isUndefined(lastItem))
self._storeCache(address, lastItem, cache);
if(!_.isUndefined(lastTx))
self._storeCache(address, lastTx, lastBlock, cache);
self._cacheInstance.delete(address); //remove from running instance
@ -456,10 +471,10 @@ AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, r
}
AddressService.prototype._storeCache = function(address, lastCacheTx, result, callback) {
AddressService.prototype._storeCache = function(address, lastCacheTx, lastCacheBlock, result, callback) {
const self = this;
var key = self._encoding.encodeAddressCacheKey(address);
var value = self._encoding.encodeAddressCacheValue(lastCacheTx, result.balanceSat, result.totalReceivedSat, result.totalSentSat, result.txApperances)
var value = self._encoding.encodeAddressCacheValue(lastCacheTx, lastCacheBlock, result.balanceSat, result.totalReceivedSat, result.totalSentSat, result.txApperances)
if(!_.isFunction(callback)) //if callback is not passed, call a empty function
callback = () => null;
@ -467,34 +482,65 @@ AddressService.prototype._storeCache = function(address, lastCacheTx, result, ca
self._db.put(key, value, callback);
}
AddressService.prototype._loadCache = function(address, result, useCache, next) {
AddressService.prototype._loadCache = function(address, result, useCache, callback) {
const self = this;
if(!useCache) //skip if useCache is false (cases like 'after' parameter is used by client)
next();
if(!useCache) //skip if useCache is false (cases like 'after' and/or 'before' parameter is used by client)
return callback();
var key = self._encoding.encodeAddressCacheKey(address);
self._db.get(key, function(err, value) {
if (err) {
return next(err);
return callback(err);
}
if (!value) {
return next();
return callback();
}
var addressCache = self._encoding.decodeAddressCacheValue(value);
//values are in satoshis
result.balanceSat = addressCache.balance;
result.totalReceivedSat = addressCache.received;
result.totalSentSat = addressCache.sent;
result.txApperances = addressCache.txApperances;
next(null, addressCache.lastTx);
var lastCacheTx = addressCache.lastTx, lastCacheBlock = addressCache.lastBlock
self._block.getBlock(lastCacheBlock, function(err, block) {
if(err) {
return callback(err);
}
if (!block) { //block not found, probably removed in reorg.
//delete the existing cache and recalc values freshly
self._deleteCache(address, function() {
callback();
});
} else {
//values are in satoshis
result.balanceSat = addressCache.balance;
result.totalReceivedSat = addressCache.received;
result.totalSentSat = addressCache.sent;
result.txApperances = addressCache.txApperances;
callback(null, lastCacheTx);
}
})
});
}
AddressService.prototype._deleteCache = function(address, callback) {
const self = this;
var key = self._encoding.encodeAddressCacheKey(address);
if(!_.isFunction(callback)) //if callback is not passed, call a empty function
callback = () => null;
self._db.del(key, callback);
}
AddressService.prototype._setOutputResults = function(tx, address, result) {
for(var j = 0; j < tx.outputs.length; j++) {
@ -825,29 +871,21 @@ AddressService.prototype.stop = function(callback) {
AddressService.prototype._getTxidStream = function(address, options) {
var start;
if(options.after) {
start = this._encoding.encodeAddressIndexKey(address, options.start, options.after, 0xffffffff, 1, 0xffffffff); //0xffffffff is for getting after the txid
} else {
start = this._encoding.encodeAddressIndexKey(address, options.start);
}
var endHeightBuf = new Buffer(4);
endHeightBuf.writeUInt32BE(options.end);
var end = Buffer.concat([
start.slice(0, address.length + 4),
endHeightBuf,
new Buffer(new Array(83).join('f'), 'hex')
]);
var criteria = {
gte: start,
lte: end
//reverse: true // txids stream from low confirmations to high confirmations
};
//NOTE: commentted reverse to keep the order in asc when reading to preserve continuity when using `after` option
var criteria = {};
if(options.after)
criteria.gt = this._encoding.encodeAddressIndexKey(address, options.start, options.after, 0xffffffff, 1, 0xffffffff); //0xffffffff is for getting after the txid
else
criteria.gte = this._encoding.encodeAddressIndexKey(address, options.start);
if(options.before)
criteria.lt = this._encoding.encodeAddressIndexKey(address, options.end, options.before); //get before the txid
else
criteria.lte = this._encoding.encodeAddressIndexKey(address, options.end, Array(65).join('f'), 0xffffffff, 1, 0xffffffff);
//reverse option can be used explictly when latest tx are required
if(options.reverse)
criteria.reverse = true;
// txid stream
var txidStream = this._db.createKeyStream(criteria);
@ -972,8 +1010,8 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
options.start = options.start || 0;
options.end = options.end || 0xffffffff;
//options.from = options.from || 0; //Deprecated, use `after` option
//options.to = options.to || 0xffffffff; //Deprecated, use `after` option
//options.from = options.from || 0; //Deprecated, use `after` and `before` option
//options.to = options.to || 0xffffffff; //Deprecated, use `after` and `before` option
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
@ -983,6 +1021,10 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
options.mempoolOnly = false;
}
if (_.isUndefined(options.reverse)) {
options.reverse = false;
}
//declare the queue to process tx data
var tmpTxList = {}; //store processed txid temporarily to ignore duplication
@ -1044,10 +1086,11 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
}
async.waterfall([
const waterfall_array = [];
waterfall_array.push(
//Find start height if `after` option is passed
function(next){
function parse_after_id(next){
if(_.isUndefined(options.after)) {
return next();
@ -1069,10 +1112,36 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
});
},
});
// stream the rest of the confirmed txids out of the address index
function(next) {
waterfall_array.push(
//Find end height if `before` option is passed
function parse_before_id(next){
if(_.isUndefined(options.before)) {
return next();
}
self._transaction.getTransaction(options.before, options, function(err, tx) {
if(tx && tx.confirmations && tx.height <= options.end) {
options.end = tx.height;
} else {
delete options.before;
}
next();
});
});
// stream the confirmed txids out of the address index
function query_confirmed_txids(next) {
if (options.mempoolOnly) { //Option to query from mempool only (ie, unconfirmed txs only)
return next();
@ -1108,31 +1177,38 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
var txidStream = self._getTxidStream(address, options);
txidStream.pipe(txIdTransformStream);
},
}
// query the mempool for relevant txs for this address
function(next) {
function query_mempool_txids(next) {
if (!options.queryMempool) {
return next(null, []);
}
self._mempool.getTxidsByAddress(address, 'both', next);
},
// add the meta data such as input values, etc.
function(mempoolTxids, next) {
if (mempoolTxids.length <= 0) {
if (!options.queryMempool || !_.isUndefined(options.before)) { //if queryMempool=false or options.before is given a valid value, then do not query mempool
return next();
}
mempoolTxids.map(id => q.push(id, chunkCallback));
next();
},
self._mempool.getTxidsByAddress(address, 'both', function(err, mempoolTxids) {
if (mempoolTxids.length <= 0) {
return next();
}
mempoolTxids.map(id => q.push(id, chunkCallback));
next();
});
}
if(options.reverse){ //when queried txs in reverse key order, mempool first then confirmed
waterfall_array.push(query_mempool_txids);
waterfall_array.push(query_confirmed_txids);
} else { //when queried tx in key order, confirmed tx 1st, then mempool
waterfall_array.push(query_confirmed_txids);
waterfall_array.push(query_mempool_txids);
}
waterfall_array.push(
//wait for queue to complete
function(next) {
function end_fall(next) {
if(!q.started || q.idle()) //No tx in query (or) already finished querying
return next();
@ -1140,9 +1216,9 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
else
q.drain = () => next();
}
});
], callback);
async.waterfall(waterfall_array, callback);
}

View File

@ -3,6 +3,7 @@
var util = require('util');
var fs = require('fs');
var async = require('async');
var _ = require('lodash');
var levelup = require('levelup');
var leveldown = require('leveldown');
var mkdirp = require('mkdirp');
@ -150,6 +151,21 @@ DB.prototype.put = function(key, value, callback) {
this._store.put(key, value, callback);
};
DB.prototype.del = function(key, callback) {
if (this._stopping) {
callback();
}
// FLOSight Error Correction from RanchiMall 20th May 2021. removed the unhandled assert and replaced by looging of error
if (Buffer.isBuffer(key) == false) {
log.error('key NOT a buffer as expected.');
}
// assert(Buffer.isBuffer(key), 'key NOT a buffer as expected.');
this._store.del(key, callback);
}
DB.prototype.batch = function(ops, callback) {
if (this._stopping) {