flocore-node/lib/services/address/index.js
sairajzero 6beb2ecd06 Adding function to delete cache for a given addr
Adding _deleteCache: deletes the cache for given address (useful for dev purposes or to recalculate caches)
2023-04-19 02:31:04 +05:30

1441 lines
36 KiB
JavaScript

'use strict';
var BaseService = require('../../service');
var inherits = require('util').inherits;
var async = require('async');
var index = require('../../');
var log = index.log;
var flocore = require('flocore-lib');
var Unit = flocore.Unit;
var _ = flocore.deps._;
var lodash = require('lodash');
var Encoding = require('./encoding');
var Transform = require('stream').Transform;
var assert = require('assert');
var utils = require('../../utils');
var LRU = require('lru-cache');
var XXHash = require('xxhash');
const MAX_TX_QUERY_LIMIT_HISTORY = 1000;
const MAX_TX_QUERY_LIMIT_UTXO = 1000;
const MAX_TX_QUERY_LIMIT_SUMMARY = 5000;
// See rationale about this cache at function getTxList(next)
const TXID_LIST_CACHE_ITEMS = 250; // nr of items (this translates to: consecutive
// clients downloading their tx history)
const TXID_LIST_CACHE_EXPIRATION = 1000 * 30; // ms
const TXID_LIST_CACHE_MIN = 100; // Min items to cache
const TXID_LIST_CACHE_SEED = 0x3233DE; // Min items to cache
var AddressService = function(options) {
BaseService.call(this, options);
this._header = this.node.services.header;
this._block = this.node.services.block;
this._timestamp = this.node.services.timestamp;
this._transaction = this.node.services.transaction;
this._network = this.node.network;
this._db = this.node.services.db;
this._mempool = this.node.services.mempool;
this._txIdListCache = new LRU({
max: TXID_LIST_CACHE_ITEMS,
maxAge: TXID_LIST_CACHE_EXPIRATION
});
if (this._network === 'livenet') {
this._network = 'main';
}
if (this._network === 'regtest') {
this._network = 'testnet';
}
};
inherits(AddressService, BaseService);
AddressService.dependencies = [
'db',
'block',
'header',
'transaction',
'timestamp',
'mempool'
];
// this must return the to-from number of txs for ALL passed in addresses sort from latest txs to earliest
// for example if the query /api/addrs/txs?from=0&to=5&noAsm=1&noScriptSig=1&noSpent=1, and the addresses passed
// in are [addr1, addr2, addr3], then if addr3 has tx1 at height 10, addr2 has tx2 at height 9 and tx1 has no txs,
// then I would pass back [tx1, tx2] in that order
//
// Instead of passing addresses, with from>0, options.cacheKey can be used to define the address set.
//(old one: non-optimized for large data)
AddressService.prototype.__getAddressHistory = function(addresses, options, callback) {
var self = this;
var cacheUsed = false;
options = options || {};
options.from = options.from || 0;
options.to = options.to || 0xffffffff;
options.txIdList = [];
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
if (_.isString(addresses)) {
addresses = [addresses];
}
function getTxList(next) {
function hashAddresses(addresses) {
// Given there are only TXID_LIST_CACHE_ITEMS ~ 250 items cached at the sametime
// a 32 bits hash is secure enough
return XXHash.hash(Buffer.from(addresses.join('')), TXID_LIST_CACHE_SEED);
};
var calculatedCacheKey;
// We use the cache ONLY on from > 0 queries.
//
// Rationale: The a full history is downloaded, the client do
// from =0, to=x
// then from =x+1 to=y
// then [...]
// The objective of this cache is to speed up the from>0 queries, and also
// "freeze" the txid list during download.
//
if (options.from >0 ) {
let cacheKey = options.cacheKey;
if (!cacheKey) {
calculatedCacheKey = hashAddresses(addresses);
cacheKey = calculatedCacheKey;
}
var txIdList = self._txIdListCache.get(cacheKey);
if (txIdList) {
options.txIdList = txIdList;
cacheUsed = true;
return next();
}
}
// Get the list from the db
async.eachLimit(addresses, 4, function(address, next) {
self._getAddressTxidHistory(address, options, next);
}, function(err) {
if (err) return next(err);
var list = lodash.uniqBy(options.txIdList, function(x) {
return x.txid + x.height;
});
options.txIdList = lodash.orderBy(list,['height','txid'], ['desc','asc']);
if (list.length > TXID_LIST_CACHE_MIN) {
calculatedCacheKey = calculatedCacheKey || hashAddresses(addresses);
self._txIdListCache.set(calculatedCacheKey, options.txIdList);
}
return next();
});
};
getTxList(function(err) {
if(err) {
return callback(err);
}
self._getAddressTxHistory(options, function(err, txList) {
if (err) {
return callback(err);
}
var results = {
totalCount: options.txIdList.length || 0,
items: txList,
};
// cacheUsed is returned for testing
callback(null, results, cacheUsed);
});
});
};
AddressService.prototype.getAddressHistory = function(addresses, options, streamer, callback) {
var self = this;
options = options || {};
//options.from = options.from || 0; //Deprecated, use `after` option
//options.to = options.to || 0xffffffff; //Deprecated, use `after` option
if(!_.isFunction(callback)){ //if only 3 args, then streamer is callback
callback = streamer;
streamer = () => null; //NULL fn
}
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
if (_.isUndefined(options.mempoolOnly)) {
options.mempoolOnly = false;
}
var old_support = false;
//Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use)
if( !_.isUndefined(options.from) || !_.isUndefined(options.to)) {
old_support = true;
options.from = options.from || 0;
options.to = options.to || 0xffffffff; //Max value of to will actually be MAX_TX_QUERY_LIMIT_HISTORY
}
if (_.isString(addresses)) {
addresses = [addresses];
}
var results = {
totalCount: 0,
items: [],
}
async.eachLimit(addresses, 4, function(address, next) {
var addr_options = Object.assign({}, options), addr_count = 0;
self._streamAddressSummary(address, addr_options, function(err, tx){
if(err)
return log.error(err);
addr_count++;
if(!results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array
results.items.unshift(tx); //using unshift, so that recent tx (low) are at front
if(results.items.length > MAX_TX_QUERY_LIMIT_HISTORY) { //remove items from array when overflown
results.items.sort((a, b) => (b.__height || 0xffffffff) - (a.__height || 0xffffffff) || b.txid().localeCompare(a.txid()));
let del_count = results.items.length - MAX_TX_QUERY_LIMIT_HISTORY;
let start_index = old_support ? MAX_TX_QUERY_LIMIT_HISTORY : 0;
results.items.splice(start_index, del_count);
results.incomplete = true;
if(!old_support && addr_count >= MAX_TX_QUERY_LIMIT_HISTORY)
addr_options.flag_stop = true; //limit has reached, stop quering db for more tx
}
streamer(null, tx);
}, next);
}, function(err) {
if (err) {
return callback(err);
}
//sort items in desc block-height, then asc txid (if same height)
results.items.sort((a, b) => (b.__height || 0xffffffff) - (a.__height || 0xffffffff) || b.txid().localeCompare(a.txid()));
results.totalCount = results.items.length ;
//Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use)
if(old_support) {
results.items = results.items.slice(options.from, options.to);
}
callback(null, results);
})
}
// this is basically the same as _getAddressHistory apart from the summary
//(old one: non-optimized for large data)
AddressService.prototype.__getAddressSummary = function(address, options, callback) {
var self = this;
options = options || {};
options.from = options.from || 0;
options.to = options.to || 0xffffffff;
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
var result = {
addrStr: address,
balance: 0,
balanceSat: 0,
totalReceived: 0,
totalReceivedSat: 0,
totalSent: 0,
totalSentSat: 0,
unconfirmedBalance: 0,
unconfirmedBalanceSat: 0,
unconfirmedTxApperances: 0,
txApperances: 0,
};
self.__getAddressHistory(address, options, function(err, results) { //old fn
if (err) {
return callback(err);
}
var txs = results.items;
self._getAddressSummaryResult(txs, address, result, options);
result.balance = Unit.fromSatoshis(result.balanceSat).toBTC();
result.totalReceived = Unit.fromSatoshis(result.totalReceivedSat).toBTC();
result.totalSent = Unit.fromSatoshis(result.totalSentSat).toBTC();
result.unconfirmedBalance = Unit.fromSatoshis(result.unconfirmedBalanceSat).toBTC();
callback(null, result);
});
};
AddressService.prototype.getAddressSummary = function(address, options, streamer, callback) {
var self = this;
options = options || {};
//options.from = options.from || 0; //Deprecated
//options.to = options.to || 0xffffffff; //Deprecated
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
if(!_.isFunction(callback)){ //if only 3 args, then streamer is callback
callback = streamer;
streamer = () => null; //NULL fn
}
var count = 0;
var result = {
addrStr: address,
balance: 0,
balanceSat: 0,
totalReceived: 0,
totalReceivedSat: 0,
totalSent: 0,
totalSentSat: 0,
unconfirmedBalance: 0,
unconfirmedBalanceSat: 0,
unconfirmedTxApperances: 0,
txApperances: 0,
};
var useCache = _.isUndefined(options.after);
self._loadCache(address, result, useCache, function(err, lastCachedTx) {
if(err)
log.error(err);
if(!_.isUndefined(lastCachedTx))
options.after = lastCachedTx;
self._streamAddressSummary(address, options, function(err, tx) {
if(err)
return log.error(err);
if(tx) {
count++;
self._aggregateAddressSummaryResult(tx, address, result, options);
if(tx.confirmations)
result.lastItem = tx.txid();
}
if(count >= MAX_TX_QUERY_LIMIT_SUMMARY) {//stop quering db when limit reached
options.flag_stop = true;
result.incomplete = true;
}
streamer(null, tx);
}, function(err) {
if (err) {
return callback(err);
}
result.balanceSat = parseInt(result.balanceSat.toFixed());
result.totalReceivedSat = parseInt(result.totalReceivedSat.toFixed());
result.totalSentSat = parseInt(result.totalSentSat.toFixed());
result.txApperances = parseInt(result.txApperances.toFixed());
result.unconfirmedBalanceSat = parseInt(result.unconfirmedBalanceSat.toFixed());
result.unconfirmedTxApperances = parseInt(result.unconfirmedTxApperances.toFixed());
result.balance = Unit.fromSatoshis(result.balanceSat).toBTC();
result.totalReceived = Unit.fromSatoshis(result.totalReceivedSat).toBTC();
result.totalSent = Unit.fromSatoshis(result.totalSentSat).toBTC();
result.unconfirmedBalance = Unit.fromSatoshis(result.unconfirmedBalanceSat).toBTC();
callback(null, result);
//store in cache if needed
if(useCache) {
if(result.incomplete) //full summary needs to be calculated in background
self._cacheSummaryInBackground(address, result.lastItem, result);
else if (!_.isUndefined(lastCachedTx) && !_.isUndefined(result.lastItem)
&& result.lastItem != lastCachedTx && !self._cacheInstance.has(address)) //update cache if needed
self._storeCache(address, result.lastItem, result);
}
});
})
}
AddressService.prototype._cacheInstance = new Set();
AddressService.prototype._cacheSummaryInBackground = function(address, lastTx, result){
const self = this;
if(self._cacheInstance.has(address))
return;
self._cacheInstance.add(address);
const cache = {
balanceSat: result.balanceSat,
totalReceivedSat: result.totalReceivedSat,
totalSentSat: result.totalSentSat,
txApperances: result.txApperances,
unconfirmedBalanceSat: 0, //unconfirmed (mempool) values should not be cached
unconfirmedTxApperances: 0
};
const options = { queryMempool: false, after: lastTx, noTxList: true };
var lastItem;
self._streamAddressSummary(address, options, function(err, tx) {
if(err)
return log.error(err);
if(tx) {
self._aggregateAddressSummaryResult(tx, address, cache, options);
if(tx.confirmations)
lastItem = tx.txid();
}
}, function(err) {
if (err)
return log.error(err);
cache.balanceSat = parseInt(cache.balanceSat.toFixed());
cache.totalReceivedSat = parseInt(cache.totalReceivedSat.toFixed());
cache.totalSentSat = parseInt(cache.totalSentSat.toFixed());
cache.txApperances = parseInt(cache.txApperances.toFixed());
if(!_.isUndefined(lastItem))
self._storeCache(address, lastItem, cache);
self._cacheInstance.delete(address); //remove from running instance
});
}
AddressService.prototype._storeCache = function(address, lastCacheTx, result, callback) {
const self = this;
var key = self._encoding.encodeAddressCacheKey(address);
var value = self._encoding.encodeAddressCacheValue(lastCacheTx, result.balanceSat, result.totalReceivedSat, result.totalSentSat, result.txApperances)
if(!_.isFunction(callback)) //if callback is not passed, call a empty function
callback = () => null;
self._db.put(key, value, callback);
}
AddressService.prototype._loadCache = function(address, result, useCache, next) {
const self = this;
if(!useCache) //skip if useCache is false (cases like 'after' parameter is used by client)
next();
var key = self._encoding.encodeAddressCacheKey(address);
self._db.get(key, function(err, value) {
if (err) {
return next(err);
}
if (!value) {
return next();
}
var addressCache = self._encoding.decodeAddressCacheValue(value);
//values are in satoshis
result.balanceSat = addressCache.balance;
result.totalReceivedSat = addressCache.received;
result.totalSentSat = addressCache.sent;
result.txApperances = addressCache.txApperances;
next(null, addressCache.lastTx);
});
}
AddressService.prototype._deleteCache = function(address, callback) {
const self = this;
var key = self._encoding.encodeAddressCacheKey(address);
if(!_.isFunction(callback)) //if callback is not passed, call a empty function
callback = () => null;
self._db.del(key, callback);
}
AddressService.prototype._setOutputResults = function(tx, address, result) {
for(var j = 0; j < tx.outputs.length; j++) {
var output = tx.outputs[j];
if (utils.getAddress(output, this._network) !== address) {
continue;
}
result.txApperances++;
result.totalReceivedSat += output.value;
result.balanceSat += output.value;
if (tx.confirmations === 0) {
result.unconfirmedTxApperances++;
result.unconfirmedBalanceSat += output.value;
}
}
return result;
};
AddressService.prototype._setInputResults = function(tx, address, result) {
for(var i = 0; i < tx.inputs.length; i++) {
var input = tx.inputs[i];
if (utils.getAddress(input, this._network) !== address) {
continue;
}
result.totalSentSat += tx.__inputValues[i];
result.balanceSat -= tx.__inputValues[i];
if (tx.confirmations === 0) {
result.unconfirmedBalanceSat -= tx.__inputValues[i];
}
}
};
AddressService.prototype._getAddressSummaryResult = function(txs, address, result, options) {
var self = this;
for(var i = 0; i < txs.length; i++) {
var tx = txs[i];
self._setOutputResults(tx, address, result);
self._setInputResults(tx, address, result);
if (!options.noTxList) {
if (!result.transactions) {
result.transactions = [];
}
result.transactions.push(tx.txid());
}
}
return result;
};
AddressService.prototype._getOccurrenceCount = function(tx, address) {
let count = 0;
for(var i = 0; i < tx.inputs.length; i++) {
var input = tx.inputs[i];
if(utils.getAddress(input, this._network) === address)
count++;
}
for(var j = 0; j < tx.outputs.length; j++) {
var output = tx.outputs[j];
if(utils.getAddress(output, this._network) === address)
count++;
}
return count;
}
AddressService.prototype._getOutputResults = function(tx, address) {
let value = 0;
for(var j = 0; j < tx.outputs.length; j++) {
var output = tx.outputs[j];
if (utils.getAddress(output, this._network) === address)
value += output.value;
}
return value;
};
AddressService.prototype._getInputResults = function(tx, address) {
let value = 0;
for(var i = 0; i < tx.inputs.length; i++) {
var input = tx.inputs[i];
if (utils.getAddress(input, this._network) === address)
value += tx.__inputValues[i];
}
return value;
};
AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, result, options) {
var self = this;
let output_val = self._getOutputResults(tx, address);
let input_val = self._getInputResults(tx, address);
//aggregate the result
if(tx.confirmations) {
result.txApperances++;
result.totalReceivedSat += output_val;
result.balanceSat += output_val;
result.totalSentSat += input_val;
result.balanceSat -= input_val;
} else {
result.unconfirmedTxApperances++;
result.unconfirmedBalanceSat += output_val;
result.unconfirmedBalanceSat -= input_val;
}
if (!options.noTxList) {
if (!result.transactions) {
result.transactions = [];
}
let txid = tx.txid();
if(!result.transactions.includes(txid)) { //push txid only if its not in the array
result.transactions.unshift(txid); //using unshift, so that recent tx (low confirmation) are at front
if(result.transactions.length > MAX_TX_QUERY_LIMIT_SUMMARY)
result.transactions.pop(); //pop the oldest tx in list (when list limit is maxed out)
}
}
}
AddressService.prototype.getAddressUnspentOutputs = function(address, options, callback) {
var self = this;
options = options || {};
options.from = options.from || 0;
options.to = options.to || 0xffffffff;
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
var results = [];
var start = self._encoding.encodeUtxoIndexKey(address);
var final = new Buffer(new Array(73).join('f'), 'hex');
var end = Buffer.concat([ start.slice(0, -36), final ]);
var criteria = {
gte: start,
lt: end
};
async.waterfall([
// query the mempool if necessary
function(next) {
if (!options.queryMempool) {
return next(null, []);
}
self._mempool.getTxidsByAddress(address, 'output', next);
},
// if mempool utxos, then add them first
function(mempoolTxids, next) {
if (mempoolTxids.length <= 0) {
return next();
}
async.eachLimit(mempoolTxids, 4, function(id, next) {
self._mempool.getMempoolTransaction(id.txid, function(err, tx) {
if (err || !tx) {
return next(err || new Error('Address Service: missing tx: ' + id.txid));
}
results = results.concat(self._getMempoolUtxos(tx, address));
next();
});
});
next();
},
function(next) {
var utxoStream = self._db.createReadStream(criteria);
var streamErr;
utxoStream.on('end', function() {
if (streamErr) {
return callback(streamErr);
}
results = utils.orderByConfirmations(results);
next(null, results);
});
utxoStream.on('error', function(err) {
streamErr = err;
});
utxoStream.on('data', function(data) {
if(results.length >= MAX_TX_QUERY_LIMIT_UTXO) { //Max array limit reached, end response
utxoStream.emit('end');
return;
}
var key = self._encoding.decodeUtxoIndexKey(data.key);
var value = self._encoding.decodeUtxoIndexValue(data.value);
results.push({
address: address,
txid: key.txid,
vout: key.outputIndex,
ts: value.timestamp,
scriptPubKey: value.script.toString('hex'),
amount: Unit.fromSatoshis(value.satoshis).toBTC(),
height: value.height,
satoshis: value.satoshis,
confirmations: self._block.getTip().height - value.height + 1
});
});
}
], callback);
};
AddressService.prototype._getMempoolUtxos = function(tx, address) {
var results = [];
for(var i = 0; i < tx.outputs.length; i++) {
var output = tx.outputs[i];
if (utils.getAddress(output, this._network) !== address) {
continue;
}
results.push({
address: address,
txid: tx.txid(),
vout: i,
scriptPubKey: output.script.toRaw().toString('hex'),
amount: Unit.fromSatoshis(output.value).toBTC(),
height: null,
satoshis: output.value,
confirmations: 0
});
}
return results;
};
AddressService.prototype.getAPIMethods = function() {
return [
['getAddressHistory', this, this.getAddressHistory, 2],
['getAddressSummary', this, this.getAddressSummary, 1],
['getAddressUnspentOutputs', this, this.getAddressUnspentOutputs, 1]
];
};
AddressService.prototype.start = function(callback) {
var self = this;
this._db.getPrefix(this.name, function(err, prefix) {
if(err) {
return callback(err);
}
self._encoding = new Encoding(prefix);
callback();
});
};
AddressService.prototype.stop = function(callback) {
setImmediate(callback);
};
AddressService.prototype._getTxidStream = function(address, options) {
var start;
if(options.after) {
start = this._encoding.encodeAddressIndexKey(address, options.start, options.after, 0xffffffff, 1, 0xffffffff); //0xffffffff is for getting after the txid
} else {
start = this._encoding.encodeAddressIndexKey(address, options.start);
}
var endHeightBuf = new Buffer(4);
endHeightBuf.writeUInt32BE(options.end);
var end = Buffer.concat([
start.slice(0, address.length + 4),
endHeightBuf,
new Buffer(new Array(83).join('f'), 'hex')
]);
var criteria = {
gte: start,
lte: end
//reverse: true // txids stream from low confirmations to high confirmations
};
//NOTE: commentted reverse to keep the order in asc when reading to preserve continuity when using `after` option
// txid stream
var txidStream = this._db.createKeyStream(criteria);
txidStream.on('close', function() {
txidStream.unpipe();
});
return txidStream;
};
//(used by old fn)
AddressService.prototype._getAddressTxHistory = function(options, callback) {
var self = this;
// slice the txids based on pagination needs
var ids = options.txIdList.slice(options.from, options.to);
// go and get the actual txs
async.mapLimit(ids, 4, function(id, next) {
if (id.height === 0xffffffff) {
return self._mempool.getMempoolTransaction(id.txid, function(err, tx) {
if (err || !tx) {
return next(err || new Error('Address Service: could not find tx: ' + id.txid));
}
self._transaction.setTxMetaInfo(tx, options, next);
});
}
self._transaction.getDetailedTransaction(id.txid, options, next);
}, callback);
};
//(used by old fn)
AddressService.prototype._getAddressTxidHistory = function(address, options, callback) {
var self = this;
options = options || {};
options.start = options.start || 0;
options.end = options.end || 0xffffffff;
var results = [];
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
async.waterfall([
// query the mempool for relevant txs for this address
function(next) {
if (!options.queryMempool) {
return next(null, []);
}
self._mempool.getTxidsByAddress(address, 'both', next);
},
// add the meta data such as input values, etc.
function(mempoolTxids, next) {
if (mempoolTxids.length <= 0) {
return next();
}
results = mempoolTxids;
next();
},
// stream the rest of the confirmed txids out of the address index
function(next) {
var txIdTransformStream = new Transform({ objectMode: true });
txIdTransformStream._flush = function(callback) {
txIdTransformStream.emit('end');
callback();
};
txIdTransformStream.on('error', function(err) {
log.error('Address Service: txstream err: ' + err);
txIdTransformStream.unpipe();
});
txIdTransformStream.on('end', function() {
options.txIdList = options.txIdList.concat(results);
next();
});
txIdTransformStream._transform = function(chunk, enc, callback) {
var txInfo = self._encoding.decodeAddressIndexKey(chunk);
if(results.length >= MAX_TX_QUERY_LIMIT_HISTORY) { //Max array limit reached, end response
txIdTransformStream.emit('end');
return;
}
if(!results.some(r => r.txid == txInfo.txid)) //add txid to array only if its not already there
results.push({ txid: txInfo.txid, height: txInfo.height });
callback();
};
var txidStream = self._getTxidStream(address, options);
txidStream.pipe(txIdTransformStream);
}
], callback);
};
AddressService.prototype._streamAddressSummary = function(address, options, streamer, callback) {
var self = this;
options = options || {};
options.start = options.start || 0;
options.end = options.end || 0xffffffff;
//options.from = options.from || 0; //Deprecated, use `after` option
//options.to = options.to || 0xffffffff; //Deprecated, use `after` option
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
if (_.isUndefined(options.mempoolOnly)) {
options.mempoolOnly = false;
}
//declare the queue to process tx data
var tmpTxList = {}; //store processed txid temporarily to ignore duplication
var q = async.queue(function(id, cb) {
//duplication finding
if(id.txid in tmpTxList){
tmpTxList[id.txid][0]++;
if(tmpTxList[id.txid][1] !== null && tmpTxList[id.txid][0] >= tmpTxList[id.txid][1]) //all duplications are found for this txid
delete tmpTxList[id.txid];
return cb();
} else tmpTxList[id.txid] = [1, null];
if (id.height === 0xffffffff) {
return self._mempool.getMempoolTransaction(id.txid, function(err, tx) {
if (err || !tx) {
return cb(err || new Error('Address Service: could not find tx: ' + id.txid));
}
self._transaction.setTxMetaInfo(tx, options, cb);
});
}
self._transaction.getDetailedTransaction(id.txid, options, cb);
}, 1);
//q.pause(); //pause and wait until queue is set (not needed)
function chunkCallback(err, tx){
if(q.killed || (!err && !tx)) //no error or tx data (duplicate calls will have empty tx value)
return;
if(tx){
let txid = tx.txid();
tmpTxList[txid][1] = self._getOccurrenceCount(tx, address);
if(tmpTxList[txid][0] >= tmpTxList[txid][1]) //all duplications are found for this txid
delete tmpTxList[txid];
}
streamer(err, tx);
if((err || options.flag_stop) && !q.killed){
q.kill();
q.killed = true;
return callback();
}
}
async.waterfall([
//Find start height if `after` option is passed
function(next){
if(_.isUndefined(options.after)) {
return next();
}
self._transaction.getTransaction(options.after, options, function(err, tx) {
if(tx && tx.confirmations && tx.height >= options.start) {
options.start = tx.height;
} else {
delete options.after;
}
next();
});
},
// stream the rest of the confirmed txids out of the address index
function(next) {
if (options.mempoolOnly) { //Option to query from mempool only (ie, unconfirmed txs only)
return next();
}
var txIdTransformStream = new Transform({ objectMode: true });
txIdTransformStream._flush = function(cb) {
txIdTransformStream.emit('end');
cb();
};
txIdTransformStream.on('error', function(err) {
log.error('Address Service: txstream err: ' + err);
txIdTransformStream.unpipe();
});
txIdTransformStream.on('end', function() {
next();
});
txIdTransformStream._transform = function(chunk, enc, cb) {
if(options.flag_stop)//stop data query
return txIdTransformStream.unpipe();
var txInfo = self._encoding.decodeAddressIndexKey(chunk);
q.push({ txid: txInfo.txid, height: txInfo.height }, chunkCallback);
cb();
};
var txidStream = self._getTxidStream(address, options);
txidStream.pipe(txIdTransformStream);
},
// query the mempool for relevant txs for this address
function(next) {
if (!options.queryMempool) {
return next(null, []);
}
self._mempool.getTxidsByAddress(address, 'both', next);
},
// add the meta data such as input values, etc.
function(mempoolTxids, next) {
if (mempoolTxids.length <= 0) {
return next();
}
mempoolTxids.map(id => q.push(id, chunkCallback));
next();
},
//wait for queue to complete
function(next) {
if(!q.started || q.idle()) //No tx in query (or) already finished querying
return next();
else
q.drain = () => next();
}
], callback);
}
AddressService.prototype._removeBlock = function(block, callback) {
var self = this;
async.mapSeries(block.txs, function(tx, next) {
self._removeTx(tx, block, next);
}, callback);
};
AddressService.prototype._removeTx = function(tx, block, callback) {
var self = this;
var operations = [];
async.parallelLimit([
function(next) {
async.eachOfSeries(tx.inputs, function(input, indext, next) {
self._removeInput(input, tx, block, index, function(err, ops) {
if(err) {
return next(err);
}
operations = operations.concat(ops);
next();
});
}, next);
},
function(next) {
async.eachOfSeries(tx.outputs, function(output, index, next) {
self._removeOutput(output, tx, block, index, function(err, ops) {
if(err) {
return next(err);
}
operations = operations.concat(ops);
next();
});
}, next);
}
], 4, function(err) {
if(err) {
return callback(err);
}
callback(null, operations);
});
};
AddressService.prototype._removeInput = function(input, tx, block, index, callback) {
var self = this;
var address = input.getAddress();
var removalOps = [];
if (!address) {
return callback();
}
address.network = self._network;
address = address.toString(self._network);
assert(block && block.__ts && block.__height, 'Missing block or block values.');
removalOps.push({
type: 'del',
key: self._encoding.encodeAddressIndexKey(address, block.__height, tx.txid(), index, 1, block.__ts)
});
// look up prev output of this input and put it back in the set of utxos
self._transaction.getTransaction(input.prevout.txid(), function(err, _tx) {
if (err) {
return callback(err);
}
assert(_tx, 'Missing prev tx to insert back into the utxo set when reorging address index.');
assert(_tx.__height && _tx.__inputValues && _tx.__timestamp, 'Missing tx values.');
removalOps.push({
type: 'put',
key: self._encoding.encodeUtxoIndexKey(address, _tx.txid(), input.prevout.index),
value: self._encoding.encodeUtxoIndexValue(
_tx.__height,
_tx.__inputValues[input.prevout.index],
_tx.__timestamp, _tx.outputs[input.prevout.index].script.toRaw())
});
callback(null, removalOps);
});
};
AddressService.prototype._removeOutput = function(output, tx, block, index, callback) {
var self = this;
var address = output.getAddress();
var removalOps = [];
if (!address) {
return callback();
}
address.network = self._network;
address = address.toString(self._network);
assert(block && block.__ts && block.__height, 'Missing block or block values.');
removalOps.push({
type: 'del',
key: self._encoding.encodeAddressIndexKey(address, block.__height, tx.txid(), index, 0, block.__ts)
});
//remove the utxo for this output from the collection
removalOps.push({
type: 'del',
key: self._encoding.encodeUtxoIndexKey(address, tx.txid(), index)
});
setImmediate(function() {
callback(null, removalOps);
});
};
AddressService.prototype.onReorg = function(args, callback) {
var self = this;
var oldBlockList = args[1];
// for every tx, remove the address index key for every input and output
// for every input record, we need to find its previous output and put it back into the utxo collection
async.mapSeries(oldBlockList, self._removeBlock.bind(self), function(err, ops) {
if (err) {
return callback(err);
}
var operations = lodash.compact(lodash.flattenDeep(ops));
callback(null, operations);
});
};
AddressService.prototype.onBlock = function(block, callback) {
var self = this;
if (self.node.stopping) {
return callback();
}
var operations = [];
for(var i = 0; i < block.txs.length; i++) {
var tx = block.txs[i];
var ops = self._processTransaction(tx, { block: block });
operations.push(ops);
}
operations = lodash.flattenDeep(operations);
callback(null, operations);
};
AddressService.prototype._processInput = function(tx, input, index, opts) {
var address = input.getAddress();
if(!address) {
return;
}
address.network = this._network;
address = address.toString(this._network);
var txid = tx.txid();
var timestamp = this._timestamp.getTimestampSync(opts.block.rhash());
assert(timestamp, 'Must have a timestamp in order to process input.');
// address index
var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.__height, txid, index, 1, timestamp);
var operations = [{
type: 'put',
key: addressKey
}];
// prev utxo
var rec = {
type: 'del',
key: this._encoding.encodeUtxoIndexKey(address, input.prevout.txid(), input.prevout.index)
};
operations.push(rec);
return operations;
};
AddressService.prototype._processOutput = function(tx, output, index, opts) {
// TODO: if the output is pay to public key, we are reporting this as p2pkh
// this leads to the spending tx not being properly indexed. Txs that
// spend p2pk outputs, will not have the public key as part of their input script sig
var address = output.getAddress();
if(!address) {
return;
}
address.network = this._network;
address = address.toString(this._network);
var txid = tx.txid();
var timestamp = this._timestamp.getTimestampSync(opts.block.rhash());
assert(timestamp, 'Must have a timestamp in order to process output.');
var addressKey = this._encoding.encodeAddressIndexKey(address, opts.block.__height, txid, index, 0, timestamp);
var utxoKey = this._encoding.encodeUtxoIndexKey(address, txid, index);
var utxoValue = this._encoding.encodeUtxoIndexValue(
opts.block.__height,
output.value,
timestamp,
output.script.toRaw()
);
var operations = [{
type: 'put',
key: addressKey
}];
operations.push({
type: 'put',
key: utxoKey,
value: utxoValue
});
return operations;
};
AddressService.prototype._processTransaction = function(tx, opts) {
var self = this;
var _opts = { block: opts.block };
var outputOperations = tx.outputs.map(function(output, index) {
return self._processOutput(tx, output, index, _opts);
});
outputOperations = lodash.compact(lodash.flattenDeep(outputOperations));
assert(outputOperations.length % 2 === 0 &&
outputOperations.length <= tx.outputs.length * 2,
'Output operations count is not reflective of what should be possible.');
var inputOperations = tx.inputs.map(function(input, index) {
return self._processInput(tx, input, index, _opts);
});
inputOperations = lodash.compact(lodash.flattenDeep(inputOperations));
assert(inputOperations.length % 2 === 0 &&
inputOperations.length <= tx.inputs.length * 2,
'Input operations count is not reflective of what should be possible.');
outputOperations = outputOperations.concat(inputOperations);
return outputOperations;
};
module.exports = AddressService;