Adding search mempool by address.

This commit is contained in:
Chris Kleeschulte 2017-10-05 16:18:16 -04:00
parent 67ce58d698
commit c7c268f00a
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
4 changed files with 209 additions and 81 deletions

View File

@ -11,6 +11,7 @@ var _ = bitcore.deps._;
var Encoding = require('./encoding');
var Transform = require('stream').Transform;
var assert = require('assert');
var Stream = require('stream');
var AddressService = function(options) {
@ -22,6 +23,7 @@ var AddressService = function(options) {
this._transaction = this.node.services.transaction;
this._network = this.node.network;
this._db = this.node.services.db;
this._mempool = this.node.services.mempool;
if (this._network === 'livenet') {
this._network = 'main';
@ -39,7 +41,8 @@ AddressService.dependencies = [
'block',
'header',
'transaction',
'timestamp'
'timestamp',
'mempool'
];
// ---- public function prototypes
@ -156,13 +159,13 @@ AddressService.prototype.getAddressSummary = function(address, options, callback
self._tx.getTransaction(key.txid, options, function(err, tx) {
if(err) {
log.error(err);
log.error('Address Service: gettransaction ' + err);
txStream.emit('error', err);
return;
}
if (!tx) {
log.error('Could not find tx for txid: ' + key.txid + '. This should not be possible, check indexes.');
log.error('Address Service: Could not find tx for txid: ' + key.txid + '. This should not be possible, check indexes.');
txStream.emit('error', new Error('Txid should map to a tx.'));
return;
}
@ -201,7 +204,7 @@ AddressService.prototype.getAddressSummary = function(address, options, callback
};
txStream.on('error', function(err) {
log.error(err);
log.error('Address Service: txstream on error ' + err);
txStream.unpipe();
});
@ -295,28 +298,12 @@ AddressService.prototype.stop = function(callback) {
setImmediate(callback);
};
AddressService.prototype._getTxidStream = function(address, options) {
// ---- start private function prototypes
AddressService.prototype._getAddressHistory = function(address, options, callback) {
var self = this;
options = options || {};
options.start = options.start || 0;
options.end = options.end || 0xffffffff;
var endHeightBuf = new Buffer(4);
endHeightBuf.writeUInt32BE(options.end);
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
var results = [];
var start = self._encoding.encodeAddressIndexKey(address, options.start);
var start = this._encoding.encodeAddressIndexKey(address, options.start);
var end = Buffer.concat([
start.slice(0, address.length + 4),
endHeightBuf,
options.endHeightBuf,
new Buffer(new Array(83).join('f'), 'hex')
]);
@ -326,81 +313,72 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac
};
// txid stream
var txidStream = self._db.createKeyStream(criteria);
var txidStream = this._db.createKeyStream(criteria);
txidStream.on('close', function() {
txidStream.unpipe();
});
};
// tx stream
var txStream = new Transform({ objectMode: true, highWaterMark: 1000 });
AddressService.prototype._transformTxForAddressHistory = function(opts, chunk, enc, callback) {
var streamErr;
txStream.on('end', function() {
var self = this;
var key = self._encoding.decodeAddressIndexKey(chunk);
if (streamErr) {
return callback(streamErr);
self._tx.getTransaction(key.txid, opts, function(err, tx) {
if (err) {
log.error('Address Service: gettransaction ' + err);
opts.stream.emit('error', err);
return callback();
}
callback(null, results);
if (!tx) {
log.error('Address Service: Could not find tx for txid: ' + key.txid + '. This should not be possible, check indexes.');
opts.stream.emit('error', err);
return callback();
}
});
assert(tx.__height >- 0, 'tx must have a height');
self._header.getBlockHeader(tx.__height, function(err, hash) {
// pipe txids into tx stream for processing
txidStream.pipe(txStream);
txStream._transform = function(chunk, enc, callback) {
var key = self._encoding.decodeAddressIndexKey(chunk);
self._tx.getTransaction(key.txid, options, function(err, tx) {
if (err) {
log.error(err);
txStream.emit('error', err);
if(err) {
log.error('Address Service: getblockheader ' + err);
opts.stream.emit('error', err);
return callback();
}
if (!tx) {
log.error('Could not find tx for txid: ' + key.txid + '. This should not be possible, check indexes.');
txStream.emit('error', err);
return callback();
}
assert(tx.__height >- 0, 'tx must have a height');
self._header.getBlockHeader(tx.__height, function(err, hash) {
if(err) {
log.error(err);
txStream.emit('error', err);
return callback();
}
tx.__blockhash = hash;
var outputSatoshis = 0;
tx.outputs.forEach(function(output) {
outputSatoshis += output.value;
});
var inputSatoshis = 0;
tx.__inputValues.forEach(function(value) {
inputSatoshis += value;
});
tx.__outputSatoshis = outputSatoshis;
tx.__inputSatoshis = inputSatoshis;
results.push(tx);
callback();
tx.__blockhash = hash;
var outputSatoshis = 0;
tx.outputs.forEach(function(output) {
outputSatoshis += output.value;
});
var inputSatoshis = 0;
tx.__inputValues.forEach(function(value) {
inputSatoshis += value;
});
tx.__outputSatoshis = outputSatoshis;
tx.__inputSatoshis = inputSatoshis;
opts.results.push(tx);
callback();
});
};
});
};
AddressService.prototype._getTxStream = function(address, options) {
var txStream = new Transform({ objectMode: true, highWaterMark: 1000 });
options.stream = txStream;
txStream.on('error', function(err) {
log.error(err);
log.error('Address Service: txstream err: ' + err);
txStream.unpipe();
});
@ -409,6 +387,65 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac
callback();
};
txStream._transform = this._transformTxForAddressHistory.bind(this, options);
};
AddressService.prototype._getAddressHistory = function(address, options, callback) {
var self = this;
options = options || {};
options.start = options.start || 0;
options.end = options.end || 0xffffffff;
options.endHeightBuf = new Buffer(4);
options.endHeightBuf.writeUInt32BE(options.end);
options.results = [];
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
async.waterfall([
function(next) {
if (!options.queryMempool) {
return next();
}
self._mempool.getTxidsByAddress(address, next);
},
function(mempoolTxids, next) {
console.log(mempoolTxids);
var txStream = self._getTxStream(address, options);
txStream.on('end', function() {
return callback(null, options.results);
});
if (mempoolTxids) {
var mempoolTxidStream = new Stream.Readable({ objectMode: true });
mempoolTxidStream.pipe(txStream);
mempoolTxids.forEach(function(txid) {
mempoolTxidStream.push(txid);
});
mempoolTxidStream.unpipe();
}
var txidStream = self._getTxidStream(address, options);
txidStream.pipe(txStream);
next();
}
], callback);
};
AddressService.prototype._removeBlock = function(block, callback) {

View File

@ -793,7 +793,7 @@ BlockService.prototype._saveBlock = function(block, callback) {
BlockService.prototype._handleError = function(err) {
if (!this.node.stopping) {
log.error(err);
log.error('Block Service: handle error ' + err);
return this.node.stop();
}
};

View File

@ -29,6 +29,7 @@ var HeaderService = function(options) {
this._lastHeader = null;
this._initialSync = true;
this._originalHeight = 0;
this._lastHeaderCount = 2000;
this._slowMode = options.slowMode;
};
@ -426,6 +427,20 @@ HeaderService.prototype._onHeaders = function(headers) {
var self = this;
// TODO we could be reaching this because either our headers list is a multiple of 2000
// or because our peer returned us an empty list because our checkpoint hash is not on
// its mainchain.
if (headers.length === 0) {
self._onHeadersSave(function(err) {
if (err) {
return self._handleError(err);
}
});
}
// used to tell the header sync loop when to stop
self._lastHeaderCount = headers.length;
if (self._headerInterval) {
clearInterval(self._headerInterval);
self._headerInterval = null;
@ -433,6 +448,10 @@ HeaderService.prototype._onHeaders = function(headers) {
log.debug('Header Service: Received: ' + headers.length + ' header(s).');
if (!headers[0]) {
return;
}
var dbOps = self._getDBOpForLastHeader(headers[0]);
var transformedHeaders = self._transformHeaders(headers);
@ -549,7 +568,10 @@ HeaderService.prototype._startBlockSubscription = function() {
HeaderService.prototype._syncComplete = function() {
return this._tip.height >= this._bestHeight;
// we always ask for the max number of headers, which is 2000.
// so any response with < 2000 means we have reached the end of the headers list.
// we could make an extra call if the number of total headers is multiple of 2000.
return this._lastHeaderCount < 2000;
};

View File

@ -10,6 +10,14 @@ var MempoolService = function(options) {
this._subscriptions.transaction = [];
this._db = this.node.services.db;
this._p2p = this.node.services.p2p;
this._network = this.node.network;
if (this._network === 'livenet') {
this._network = 'main';
}
if (this._network === 'regtest') {
this._network = 'testnet';
}
};
util.inherits(MempoolService, BaseService);
@ -18,7 +26,8 @@ MempoolService.dependencies = ['db'];
MempoolService.prototype.getAPIMethods = function() {
var methods = [
['getMempoolTransaction', this, this.getMempoolTransaction, 1]
['getMempoolTransaction', this, this.getMempoolTransaction, 1],
['getTxidsByAddress', this, this.getTxidsByAddress, 1],
];
return methods;
};
@ -112,7 +121,6 @@ MempoolService.prototype.getMempoolTransaction = function(txid, callback) {
var self = this;
console.log('query: ', txid);
self._db.get(self._encoding.encodeMempoolTransactionKey(txid), function(err, tx) {
if (err) {
@ -129,6 +137,67 @@ console.log('query: ', txid);
};
MempoolService.prototype.getTxidsByAddress = function(address, callback) {
// given an address, give me all the tx ids for txs that involve this address
// perceivably, our mempool will remain a managable size, so cycling over the whole
// pool won't be too spendy?
var self = this;
var results = [];
var start = self._encoding.encodeMempoolTransactionKey(new Array(65).join('0'));
var end = self._encoding.encodeMempoolTransactionKey(new Array(65).join('f'));
var criteria = {
gte: start,
lte: end
};
var stream = self._db.createReadStream(criteria);
stream.on('end', function() {
return callback(null, results);
});
stream.on('data', function(data) {
var tx = self._encoding.decodeMempoolTransactionValue(data.value);
var txid = self._involvesAddress(tx, address);
if (txid) {
results.push(txid);
}
});
};
MempoolService.prototype._involvesAddress = function(tx, address) {
var _address;
for(var i = 0; i < tx.inputs.length; i++) {
var input = tx.inputs[i];
_address = input.getAddress();
if (!_address) {
continue;
}
_address.network = this._network;
_address = _address.toString();
if (address === _address) {
return tx.txid();
}
}
for(i = 0; i < tx.outputs.length; i++) {
var output = tx.outputs[i];
_address = output.getAddress();
if (!_address) {
continue;
}
_address.network = this._network;
_address = _address.toString();
if (address === _address) {
return tx.txid();
}
}
};
MempoolService.prototype.stop = function(callback) {
callback();
};