Fixing recently sent items.

This commit is contained in:
Chris Kleeschulte 2017-10-04 08:34:52 -04:00
parent d81c1b9966
commit 1f4c5e5e1f
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
5 changed files with 64 additions and 13 deletions

View File

@ -13,6 +13,7 @@ var Transform = require('stream').Transform;
var assert = require('assert');
var AddressService = function(options) {
BaseService.call(this, options);
this._tx = this.node.services.transaction;
this._header = this.node.services.header;
@ -50,6 +51,12 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba
options.to = options.to || 0xffffffff;
options.queryMempool = _.isUndefined(options.queryMempool) ? true : false;
var txIdList = [];
if (_.isString(addresses)) {
addresses = [addresses];
}
async.mapLimit(addresses, 4, function(address, next) {
self._getAddressHistory(address, options, next);
@ -60,7 +67,19 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba
return callback(err);
}
var txList = _.flattenDeep(txLists);
txLists = _.flattenDeep(txLists);
var txList = [];
for(var i = 0; i < txLists.length; i++) {
var tx = txLists[i];
if (txIdList.indexOf(tx.txid()) !== -1) {
continue;
}
txIdList.push(tx.txid());
txList.push(tx);
}
txList.reverse();
var results = {
totalCount: txList.length,
@ -336,7 +355,7 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac
self._tx.getTransaction(key.txid, options, function(err, tx) {
if(err) {
if (err) {
log.error(err);
txStream.emit('error', err);
return callback();

View File

@ -112,6 +112,7 @@ MempoolService.prototype.getMempoolTransaction = function(txid, callback) {
var self = this;
console.log('query: ', txid);
self._db.get(self._encoding.encodeMempoolTransactionKey(txid), function(err, tx) {
if (err) {

View File

@ -8,7 +8,9 @@ var log = index.log;
var BaseService = require('../../service');
var assert = require('assert');
var Bcoin = require('./bcoin');
var BcoinTx = require('bcoin').tx;
var Networks = require('bitcore-lib').Networks;
var LRU = require('lru-cache');
var P2P = function(options) {
@ -24,6 +26,7 @@ var P2P = function(options) {
this._bcoin = null;
this._currentBestHeight = null;
this._latestBits = 0x1d00ffff;
this._outgoingTxs = LRU(100); // these are outgoing txs that are awaiting getdata messages
};
util.inherits(P2P, BaseService);
@ -110,8 +113,27 @@ P2P.prototype.getPublishEvents = function() {
};
P2P.prototype.sendTransaction = function(tx) {
p2p.sendMessage(this.messages.Inventory(tx));
P2P.prototype.sendTransaction = function(tx, callback) {
var peer = this._getPeer();
var bcoinTx;
try {
bcoinTx = BcoinTx.fromRaw(tx, 'hex');
} catch(e) {
return callback(e);
}
log.info('P2P Service: sending transaction: ' + bcoinTx.txid());
this._outgoingTxs.set(bcoinTx.txid(), bcoinTx);
var inv = p2p.Inventory.forTransaction(bcoinTx.txid());
var txMessage = this.messages.Inventory([inv]);
peer.sendMessage(txMessage);
this._onPeerTx(peer, { transaction: bcoinTx });
return callback(null, bcoinTx.txid());
};
@ -241,7 +263,7 @@ P2P.prototype._initP2P = function() {
if (this.node.network === 'regtest') {
Networks.enableRegtest();
}
this.messages = new p2p.Messages({ network: Networks.get(this.node.network) });
this.messages = new p2p.Messages({ network: Networks.get(this.node.network), Transaction: BcoinTx });
this._peerHeights = [];
this._peers = [];
this._peerIndex = 0;
@ -285,6 +307,15 @@ P2P.prototype._onPeerDisconnect = function(peer, addr) {
};
P2P.prototype._onPeerGetData = function(peer, message) {
// we can only respond to tx messages
var txId = message.inventory[0].hash.reverse().toString('hex');
var tx = this._outgoingTxs.get(txId);
if (tx) {
peer.sendMessage(this.messages.Transaction(tx, { Transaction: BcoinTx }));
}
};
P2P.prototype._onPeerHeaders = function(peer, message) {
this._broadcast(this.subscriptions.headers, 'p2p/headers', message.headers);
};
@ -371,6 +402,7 @@ P2P.prototype._setListeners = function() {
self._pool.on('peertx', self._onPeerTx.bind(self));
self._pool.on('peerblock', self._onPeerBlock.bind(self));
self._pool.on('peerheaders', self._onPeerHeaders.bind(self));
self._pool.on('peergetdata', self._onPeerGetData.bind(self));
self.node.on('ready', self._connect.bind(self));
};

View File

@ -126,6 +126,9 @@ TransactionService.prototype._getMempoolTransaction = function(txid, tx, options
var self = this;
var queryMempool = _.isUndefined(options.queryMempool) ? true : options.queryMempool;
if (!tx) {
console.log(txid);
}
if (tx || !queryMempool) {
return callback(null, tx, options);
}
@ -220,10 +223,6 @@ TransactionService.prototype._getInputValues = function(tx, options, callback) {
};
TransactionService.prototype.sendTransaction = function(tx, callback) {
this._p2p.sendTransaction(tx, callback);
};
TransactionService.prototype.start = function(callback) {
var self = this;

View File

@ -41,7 +41,7 @@ describe('Header Service', function() {
var getServiceTip = sandbox.stub().callsArgWith(1, null, { height: 123, hash: 'a' });
var setListeners = sandbox.stub(headerService, '_setListeners');
var getPrefix = sandbox.stub().callsArgWith(1, null, new Buffer('ffee', 'hex'));
var getLastHeader = sandbox.stub(headerService, '_getLastHeader').callsArgWith(0, null);
var adjustHeadersForCheckPointTip = sandbox.stub(headerService, '_adjustHeadersForCheckPointTip').callsArgWith(0, null);
var setGenesisBlock = sandbox.stub(headerService, '_setGenesisBlock').callsArgWith(0, null);
headerService.GENESIS_HASH = '00';
var openBus = sandbox.stub();
@ -52,7 +52,7 @@ describe('Header Service', function() {
headerService.start(function() {
expect(setGenesisBlock.calledOnce).to.be.true;
expect(getLastHeader.calledOnce).to.be.false;
expect(adjustHeadersForCheckPointTip.calledOnce).to.be.false;
expect(setListeners.calledOnce).to.be.true;
expect(headerService._tip).to.be.deep.equal({ height: 0, hash: '00' });
expect(headerService._encoding).to.be.instanceOf(Encoding);
@ -168,7 +168,7 @@ describe('Header Service', function() {
});
describe('#_getLastHeader', function() {
describe('#_adjustHeadersForCheckPointTip', function() {
it('should get the last header from which to start synchronizing more headers', function(done) {
var stream = new Emitter();
@ -181,7 +181,7 @@ describe('Header Service', function() {
createReadStream: sandbox.stub().returns(stream),
batch: sandbox.stub().callsArgWith(1, null)
};
headerService._getLastHeader(function(err) {
headerService._adjustHeadersForCheckPointTip(function(err) {
if(err) {
return done(err);
}