diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 02985431..1985e7ed 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -13,6 +13,7 @@ var Transform = require('stream').Transform; var assert = require('assert'); var AddressService = function(options) { + BaseService.call(this, options); this._tx = this.node.services.transaction; this._header = this.node.services.header; @@ -50,6 +51,12 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba options.to = options.to || 0xffffffff; options.queryMempool = _.isUndefined(options.queryMempool) ? true : false; + var txIdList = []; + + if (_.isString(addresses)) { + addresses = [addresses]; + } + async.mapLimit(addresses, 4, function(address, next) { self._getAddressHistory(address, options, next); @@ -60,7 +67,19 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba return callback(err); } - var txList = _.flattenDeep(txLists); + txLists = _.flattenDeep(txLists); + var txList = []; + + for(var i = 0; i < txLists.length; i++) { + var tx = txLists[i]; + if (txIdList.indexOf(tx.txid()) !== -1) { + continue; + } + txIdList.push(tx.txid()); + txList.push(tx); + } + + txList.reverse(); var results = { totalCount: txList.length, @@ -336,7 +355,7 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac self._tx.getTransaction(key.txid, options, function(err, tx) { - if(err) { + if (err) { log.error(err); txStream.emit('error', err); return callback(); diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index be9a51b5..d77e09cf 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -112,6 +112,7 @@ MempoolService.prototype.getMempoolTransaction = function(txid, callback) { var self = this; +console.log('query: ', txid); self._db.get(self._encoding.encodeMempoolTransactionKey(txid), function(err, tx) { if (err) { diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index a2119fbd..0aa5e995 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -8,7 +8,9 @@ var log = index.log; var BaseService = require('../../service'); var assert = require('assert'); var Bcoin = require('./bcoin'); +var BcoinTx = require('bcoin').tx; var Networks = require('bitcore-lib').Networks; +var LRU = require('lru-cache'); var P2P = function(options) { @@ -24,6 +26,7 @@ var P2P = function(options) { this._bcoin = null; this._currentBestHeight = null; this._latestBits = 0x1d00ffff; + this._outgoingTxs = LRU(100); // these are outgoing txs that are awaiting getdata messages }; util.inherits(P2P, BaseService); @@ -110,8 +113,27 @@ P2P.prototype.getPublishEvents = function() { }; -P2P.prototype.sendTransaction = function(tx) { - p2p.sendMessage(this.messages.Inventory(tx)); +P2P.prototype.sendTransaction = function(tx, callback) { + var peer = this._getPeer(); + + var bcoinTx; + try { + bcoinTx = BcoinTx.fromRaw(tx, 'hex'); + } catch(e) { + return callback(e); + } + + log.info('P2P Service: sending transaction: ' + bcoinTx.txid()); + + this._outgoingTxs.set(bcoinTx.txid(), bcoinTx); + var inv = p2p.Inventory.forTransaction(bcoinTx.txid()); + var txMessage = this.messages.Inventory([inv]); + + peer.sendMessage(txMessage); + + this._onPeerTx(peer, { transaction: bcoinTx }); + + return callback(null, bcoinTx.txid()); }; @@ -241,7 +263,7 @@ P2P.prototype._initP2P = function() { if (this.node.network === 'regtest') { Networks.enableRegtest(); } - this.messages = new p2p.Messages({ network: Networks.get(this.node.network) }); + this.messages = new p2p.Messages({ network: Networks.get(this.node.network), Transaction: BcoinTx }); this._peerHeights = []; this._peers = []; this._peerIndex = 0; @@ -285,6 +307,15 @@ P2P.prototype._onPeerDisconnect = function(peer, addr) { }; +P2P.prototype._onPeerGetData = function(peer, message) { + // we can only respond to tx messages + var txId = message.inventory[0].hash.reverse().toString('hex'); + var tx = this._outgoingTxs.get(txId); + if (tx) { + peer.sendMessage(this.messages.Transaction(tx, { Transaction: BcoinTx })); + } +}; + P2P.prototype._onPeerHeaders = function(peer, message) { this._broadcast(this.subscriptions.headers, 'p2p/headers', message.headers); }; @@ -371,6 +402,7 @@ P2P.prototype._setListeners = function() { self._pool.on('peertx', self._onPeerTx.bind(self)); self._pool.on('peerblock', self._onPeerBlock.bind(self)); self._pool.on('peerheaders', self._onPeerHeaders.bind(self)); + self._pool.on('peergetdata', self._onPeerGetData.bind(self)); self.node.on('ready', self._connect.bind(self)); }; diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 5e92676d..6c64659f 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -126,6 +126,9 @@ TransactionService.prototype._getMempoolTransaction = function(txid, tx, options var self = this; var queryMempool = _.isUndefined(options.queryMempool) ? true : options.queryMempool; + if (!tx) { + console.log(txid); + } if (tx || !queryMempool) { return callback(null, tx, options); } @@ -220,10 +223,6 @@ TransactionService.prototype._getInputValues = function(tx, options, callback) { }; -TransactionService.prototype.sendTransaction = function(tx, callback) { - this._p2p.sendTransaction(tx, callback); -}; - TransactionService.prototype.start = function(callback) { var self = this; diff --git a/test/services/header/index.unit.js b/test/services/header/index.unit.js index 7821e550..b3084a0e 100644 --- a/test/services/header/index.unit.js +++ b/test/services/header/index.unit.js @@ -41,7 +41,7 @@ describe('Header Service', function() { var getServiceTip = sandbox.stub().callsArgWith(1, null, { height: 123, hash: 'a' }); var setListeners = sandbox.stub(headerService, '_setListeners'); var getPrefix = sandbox.stub().callsArgWith(1, null, new Buffer('ffee', 'hex')); - var getLastHeader = sandbox.stub(headerService, '_getLastHeader').callsArgWith(0, null); + var adjustHeadersForCheckPointTip = sandbox.stub(headerService, '_adjustHeadersForCheckPointTip').callsArgWith(0, null); var setGenesisBlock = sandbox.stub(headerService, '_setGenesisBlock').callsArgWith(0, null); headerService.GENESIS_HASH = '00'; var openBus = sandbox.stub(); @@ -52,7 +52,7 @@ describe('Header Service', function() { headerService.start(function() { expect(setGenesisBlock.calledOnce).to.be.true; - expect(getLastHeader.calledOnce).to.be.false; + expect(adjustHeadersForCheckPointTip.calledOnce).to.be.false; expect(setListeners.calledOnce).to.be.true; expect(headerService._tip).to.be.deep.equal({ height: 0, hash: '00' }); expect(headerService._encoding).to.be.instanceOf(Encoding); @@ -168,7 +168,7 @@ describe('Header Service', function() { }); - describe('#_getLastHeader', function() { + describe('#_adjustHeadersForCheckPointTip', function() { it('should get the last header from which to start synchronizing more headers', function(done) { var stream = new Emitter(); @@ -181,7 +181,7 @@ describe('Header Service', function() { createReadStream: sandbox.stub().returns(stream), batch: sandbox.stub().callsArgWith(1, null) }; - headerService._getLastHeader(function(err) { + headerService._adjustHeadersForCheckPointTip(function(err) { if(err) { return done(err); }