From 7409dbb77d50c8e80a98abcb9ca1365bcd0faf5f Mon Sep 17 00:00:00 2001 From: sairajzero Date: Sun, 5 Feb 2023 19:03:11 +0530 Subject: [PATCH] Bug fixes - Fixed: incorrect data returned via `from` and `to` option - Fixed: Missing data due to unordered items in getAddressHistory - Fixed: callback invoked multiple items in _streamAddressSummary due to queue parallel limit - Fixed: Queue drain being invoked before mempool txs are pushed into queue --- lib/services/address/index.js | 36 +++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 81bb9712..88c627a4 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -189,6 +189,13 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream options.queryMempool = true; } + //Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use) + if( !_.isUndefined(options.from) || !_.isUndefined(options.to)) { + options.old_support = true; + options.from = options.from || 0; + options.to = options.to || 0xffffffff; //Max value of to will actually be MAX_TX_QUERY_LIMIT + } + if (_.isString(addresses)) { addresses = [addresses]; } @@ -208,8 +215,15 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream if(!options.txNotNeeded) { results.totalCount++; - if(results.items.length < MAX_TX_QUERY_LIMIT && !results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array + if(!results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array results.items.unshift(tx); //using unshift, so that recent tx (low) are at front + + if(results.items.length > MAX_TX_QUERY_LIMIT) { //remove items from array when overflown + results.items.sort((a, b) => b.__height - a.__height || a.txid().localeCompare(b.txid())); + let del_count = options.old_support ? results.items.length : results.items.length - MAX_TX_QUERY_LIMIT; + let start_index = options.old_support ? MAX_TX_QUERY_LIMIT : 0; + results.items.splice(start_index, del_count); + } } @@ -228,9 +242,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream results.totalCount = parseInt(results.totalCount.toFixed()); //Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use) - if( !_.isUndefined(options.from) || !_.isUndefined(options.to)) { - options.from = options.from || 0; - options.to = options.to || 0xffffffff; //Max value of to will actually be MAX_TX_QUERY_LIMIT + if(options.old_support) { results.items = results.items.slice(options.from, options.to); } @@ -870,7 +882,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre function chunkCallback(err, tx){ - if(!err && !tx) //no error or tx data (duplicate calls will have empty tx value) + if(q.killed || (!err && !tx)) //no error or tx data (duplicate calls will have empty tx value) return; if(tx){ @@ -883,8 +895,10 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre streamer(err, tx); - if(err || options.flag_stop){ + if((err || options.flag_stop) && !q.killed){ + q.kill(); + q.killed = true; return callback(); } @@ -934,8 +948,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre }); txIdTransformStream.on('end', function() { - q.drain = next; - //q.resume(); //(not needed if not paused above) + next(); }); txIdTransformStream._transform = function(chunk, enc, cb) { @@ -975,6 +988,13 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre next(); }, + //wait for queue to complete + function(next) { + + q.drain = () => next(); + + } + ], callback); }