wip on fix for txs list.

This commit is contained in:
Chris Kleeschulte 2017-11-03 18:23:42 -04:00
parent cdec5b4596
commit 765b7288a7
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
4 changed files with 117 additions and 69 deletions

View File

@ -44,6 +44,10 @@ AddressService.dependencies = [
'mempool' 'mempool'
]; ];
// this must return the to-from number of txs for ALL passed in addresses sort from latest txs to earliest
// for example if the query /api/addrs/txs?from=0&to=5&noAsm=1&noScriptSig=1&noSpent=1, and the addresses passed
// in are [addr1, addr2, addr3], then if addr3 has tx1 at height 10, addr2 has tx2 at height 9 and tx1 has no txs,
// then I would pass back [tx1, tx2] in that order
AddressService.prototype.getAddressHistory = function(addresses, options, callback) { AddressService.prototype.getAddressHistory = function(addresses, options, callback) {
var self = this; var self = this;
@ -51,6 +55,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba
options = options || {}; options = options || {};
options.from = options.from || 0; options.from = options.from || 0;
options.to = options.to || 0xffffffff; options.to = options.to || 0xffffffff;
options.txIdList = [];
if (_.isUndefined(options.queryMempool)) { if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true; options.queryMempool = true;
@ -60,26 +65,34 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba
addresses = [addresses]; addresses = [addresses];
} }
async.mapSeries(addresses, function(address, next) { async.eachLimit(addresses, 8, function(address, next) {
self._getAddressHistory(address, options, next); self._getAddressTxidHistory(address, options, next);
}, function(err, txList) { }, function(err) {
if(err) { if(err) {
return callback(err); return callback(err);
} }
txList = utils.dedupByTxid(txList); // armed with our complete list of txids and heights, we will go and get the actual txs
txList = utils.orderByConfirmations(txList); self._getAddressTxHistory(options, function(err, txList) {
var results = { if (err) {
totalCount: options.txCount || 0, return callback(err);
items: txList }
};
callback(null, results); txList = utils.dedupByTxid(txList);
txList = utils.orderByConfirmations(txList);
var results = {
totalCount: options.txiIdList.length || 0,
items: txList
};
callback(null, results);
});
}); });
}; };
@ -226,21 +239,30 @@ AddressService.prototype.getAddressUnspentOutputs = function(address, options, c
return next(null, []); return next(null, []);
} }
self._mempool.getTxsByAddress(address, 'output', next); self._mempool.getTxidsByAddress(address, 'output', next);
}, },
// if mempool utxos, then add them first // if mempool utxos, then add them first
function(mempoolTxs, next) { function(mempoolTxids, next) {
if (mempoolTxs.length <= 0) { if (mempoolTxids.length <= 0) {
return next(); return next();
} }
mempoolTxs.forEach(function(tx) { async.eachLimit(mempoolTxids, 4, function(id, next) {
results = results.concat(self._getMempoolUtxos(tx, address));
});
next(); self._mempool.getMempoolTransaction(id.txid, function(err, tx) {
if (err || !tx) {
return next(err || new Error('Address Service: missing tx: ' + id.txid));
}
results = results.concat(self._getMempoolUtxos(tx, address));
next();
});
});
}, },
@ -415,8 +437,41 @@ AddressService.prototype._getTxStream = function(address, options) {
}; };
AddressService.prototype._getAddressTxHistory = function(options, callback) {
var self = this;
// sort the txids by height ascending
var ids = _.sortBy(options.txIdList, function(item) {
return item.height;
});
// slice the txids based on pagination needs
ids = ids.slice(options.from, options.to);
// go and get the actual txs
async.mapLimit(ids, function(id, next) {
if (id.height === -1) {
return self._mempool.getMempoolTransaction(id.txid, function(err, tx) {
if (err || !tx) {
return next(err || new Error('Address Service: could not find tx: ' + id.txid));
}
self._transaction.setTxMetaInfo(tx, options, next);
});
}
self._transaction.getTransaction(id.txid, next);
}, callback);
};
// main api function for insight-api/bws // main api function for insight-api/bws
AddressService.prototype._getAddressHistory = function(address, options, callback) { AddressService.prototype._getAddressTxidHistory = function(address, options, callback) {
var self = this; var self = this;
@ -426,8 +481,6 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac
options.endHeightBuf = new Buffer(4); options.endHeightBuf = new Buffer(4);
options.endHeightBuf.writeUInt32BE(options.end); options.endHeightBuf.writeUInt32BE(options.end);
options.results = [];
options.txCount = 0; // this tracks the number of txs in the record set for pagination
if (_.isUndefined(options.queryMempool)) { if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true; options.queryMempool = true;
@ -442,48 +495,47 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac
return next(null, []); return next(null, []);
} }
self._mempool.getTxsByAddress(address, 'both', next); self._mempool.getTxidsByAddress(address, 'both', next);
}, },
// add the meta data such as input values, etc. // add the meta data such as input values, etc.
function(mempoolTxs, next) { function(mempoolTxids, next) {
if (mempoolTxs.length <= 0) { if (mempoolTxids.length <= 0) {
return next(); return next();
} }
async.mapSeries(mempoolTxs, function(tx, next) {
self._transaction.setTxMetaInfo(tx, options, next); options.txIdList = mempoolTxids;
}, function(err, txs) { next();
if (err) {
return next(err);
}
// what tx range are we looking for?
options.results = txs.slice(options.from, options.to);
next();
});
}, },
// stream the rest of the confirmed txids out of the address index // stream the rest of the confirmed txids out of the address index
function(next) { function(next) {
if (options.results.length >= (options.to - options.from)) { var txIdTransformStream = new Transform({ objectMode: true });
return callback(null, options.results);
}
options.from += options.results.length; txIdTransformStream._flush = function(callback) {
txIdTransformStream.emit('end');
callback();
};
var txStream = self._getTxStream(address, options); txIdTransformStream.on('error', function(err) {
txStream.on('end', function() {
return next(null, options.results);
});
txStream.on('error', function(err) {
log.error('Address Service: txstream err: ' + err); log.error('Address Service: txstream err: ' + err);
txStream.unpipe(); txIdTransformStream.unpipe();
}); });
txIdTransformStream.on('end', function() {
next(null, options.txIdList);
});
txIdTransformStream._transform = function(chunk, enc, callback) {
var txInfo = self._encoding.decodeAddressIndexKey(chunk);
options.txIdList.push({ txid: txInfo.txid, height: txInfo.height });
callback();
};
var txidStream = self._getTxidStream(address, options); var txidStream = self._getTxidStream(address, options);
txidStream.pipe(txStream); txidStream.pipe(txIdTransformStream);
} }
], callback); ], callback);

View File

@ -29,7 +29,7 @@ var BlockService = function(options) {
this._processingBlock = false; this._processingBlock = false;
this._blocksInQueue = 0; this._blocksInQueue = 0;
this._recentBlockHashesCount = options.recentBlockHashesCount || 1000; // block service won't reorg past this point this._recentBlockHashesCount = options.recentBlockHashesCount || 144; // block service won't reorg past this point
this._recentBlockHashes = new LRU(this._recentBlockHashesCount); this._recentBlockHashes = new LRU(this._recentBlockHashesCount);
this._readAheadBlockCount = options.readAheadBlockCount || 2; // this is the number of blocks to direct the p2p service to read aheead this._readAheadBlockCount = options.readAheadBlockCount || 2; // this is the number of blocks to direct the p2p service to read aheead
}; };

View File

@ -30,7 +30,7 @@ MempoolService.dependencies = ['db'];
MempoolService.prototype.getAPIMethods = function() { MempoolService.prototype.getAPIMethods = function() {
var methods = [ var methods = [
['getMempoolTransaction', this, this.getMempoolTransaction, 1], ['getMempoolTransaction', this, this.getMempoolTransaction, 1],
['getTxsByAddress', this, this.getTxsByAddress, 2], ['getTxidsByAddress', this, this.getTxsByAddress, 2],
]; ];
return methods; return methods;
}; };
@ -53,35 +53,31 @@ MempoolService.prototype.start = function(callback) {
}; };
MempoolService.prototype._flushMempool = function(callback) { MempoolService.prototype._flushMempool = function(callback) {
var self = this; var self = this;
var totalCount = 0;
log.warn('Mempool Service: flushing mempool, this could take a minute.'); log.warn('Mempool Service: flushing mempool, this could take a minute.');
// TODO: just handle the txindex for now, later handle both txindex and addressindex
// TODO: plan a migration system for upgrades to the indexes
var ops = [];
var criteria = { var criteria = {
gte: Buffer.concat([ self._encoding.servicePrefix, new Buffer(new Array(65).join('0'), 'hex')]), gte: self._encoding.encodeMempoolTransactionKey(new Array(65).join('0')),
lt: Buffer.concat([ self._encoding.servicePrefix, new Buffer(new Array(65).join('f'), 'hex')]) lte: self._encoding.encodeMempoolTransactionKey(new Array(65).join('f'))
}; };
var stream = self._db.createKeyStream(criteria); var stream = self._db.createReadStream(criteria);
stream.on('data', function(key) { stream.on('data', function(data) {
var ops = self._getAddressOperations(self._encoding.decodeMempoolTransactionValue(data.value));
ops.push({ ops.push({
type: 'del', type: 'del',
key: key key: data.key
}); });
self._db.batch(ops);
}); });
stream.on('end', function() { stream.on('end', function() {
self._db.batch(ops, function(err) { log.info('Mempool Service: completed flushing: ' + totalCount + ' tx mempool records.');
if (err) { callback();
return callback(err);
}
log.info('Mempool Service: completed flushing: ' + ops.length + ' mempool records.');
callback();
});
}); });
}; };
@ -254,7 +250,7 @@ MempoolService.prototype.getMempoolTransaction = function(txid, callback) {
}; };
MempoolService.prototype.getTxsByAddress = function(address, type, callback) { MempoolService.prototype.getTxidsByAddress = function(address, type, callback) {
var self = this; var self = this;
var results = []; var results = [];
@ -266,7 +262,7 @@ MempoolService.prototype.getTxsByAddress = function(address, type, callback) {
lte: end lte: end
}; };
var stream = self._db.createReadStream(criteria); var stream = self._db.createKeyStream(criteria);
stream.on('error', function() { stream.on('error', function() {
return []; return [];
@ -276,15 +272,15 @@ MempoolService.prototype.getTxsByAddress = function(address, type, callback) {
callback(null, results); callback(null, results);
}); });
stream.on('data', function(data) { stream.on('data', function(key) {
var addressInfo = self._encoding.decodeMempoolAddressKey(data.key); var addressInfo = self._encoding.decodeMempoolAddressKey(key);
if (type === 'input') { if (type === 'input') {
type = 1; type = 1;
} else if (type === 'output') { } else if (type === 'output') {
type = 0; type = 0;
} }
if (type === 'both' || type === addressInfo.input) { if (type === 'both' || type === addressInfo.input) {
results.push(self._encoding.decodeMempoolAddressValue(data.value)); results.push({ txid: addressInfo.txid, height: -1 });
} }
}); });

View File

@ -154,7 +154,7 @@ TransactionService.prototype.setTxMetaInfo = function(tx, options, callback) {
} }
// the tx's that contain these input values could, themselves be unconfirmed // the tx's that contain these input values could, themselves be unconfirmed
// we are also assuming that this tx is from thet mempool // we are also assuming that this tx is from the mempool
self._getInputValues(tx, options, function(err, inputValues) { self._getInputValues(tx, options, function(err, inputValues) {
if (err) { if (err) {