diff --git a/index.js b/index.js index 2ad6ab2..13e446a 100644 --- a/index.js +++ b/index.js @@ -29,6 +29,7 @@ bitcore.util.preconditions = require('./lib/util/preconditions'); bitcore.transport = {}; bitcore.transport.Peer = require('./lib/transport/peer'); bitcore.transport.Messages = require('./lib/transport/messages'); +bitcore.transport.Pool = require('./lib/transport/pool'); // errors thrown by the library bitcore.errors = require('./lib/errors'); diff --git a/lib/transport/messages.js b/lib/transport/messages.js index e196c1f..179940a 100644 --- a/lib/transport/messages.js +++ b/lib/transport/messages.js @@ -292,11 +292,31 @@ Addresses.prototype.fromBuffer = function(payload) { this.addresses = []; for (var i = 0; i < addrCount; i++) { // TODO: Time actually depends on the version of the other peer (>=31402) + + var time = parser.readUInt32LE(); + var services = parser.readUInt64LEBN(); + + // parse the ipv6 to a string + var ipv6 = []; + for (var a = 0; a < 6; a++) { + ipv6.push(parser.read(2).toString('hex')); + } + ipv6 = ipv6.join(':'); + + // parse the ipv4 to a string + var ipv4 = []; + for (var b = 0; b < 4; b++) { + ipv4.push(parser.read(1)[0]); + } + ipv4 = ipv4.join('.'); + + var port = parser.readUInt16BE(); + this.addresses.push({ - time: parser.readUInt32LE(), - services: parser.readUInt64LEBN(), - ip: parser.read(16), - port: parser.readUInt16BE() + time: time, + services: services, + ip: { v6: ipv6, v4: ipv4 }, + port: port }); } diff --git a/lib/transport/peer.js b/lib/transport/peer.js index 724cd2e..aea73bd 100644 --- a/lib/transport/peer.js +++ b/lib/transport/peer.js @@ -65,7 +65,7 @@ function Peer(host, port, network) { }); this.on('ping', function(message) { - self.sendPong(message.nonce); + self._sendPong(message.nonce); }); } diff --git a/lib/transport/pool.js b/lib/transport/pool.js new file mode 100644 index 0000000..d99033a --- /dev/null +++ b/lib/transport/pool.js @@ -0,0 +1,273 @@ +'use strict'; + +var dns = require('dns'); +var EventEmitter = require('events').EventEmitter; +var Networks = require('../networks'); +var sha256 = require('../crypto/hash').sha256; +var Peer = require('./peer'); +var util = require('util'); + +function now() { + return Math.floor(new Date().getTime() / 1000); +} + +/** + * A pool is a collection of Peers. A pool will discover peers from DNS seeds, and + * collect information about new peers in the network. When a peer disconnects the pool + * will connect to others that are available to maintain a max number of + * ongoing peer connections. Peer events are relayed to the pool. + * + * @example + * + * var pool = new Pool(Networks.livenet); + * pool.on('peerinv', function(peer, message) { + * // do something with the inventory announcement + * }); + * pool.connect(); + * + * @param {Network|String} network - The network to connect + * @returns {Pool} + * @constructor + */ +function Pool(network) { + + var self = this; + + this.network = Networks.get(network) || Networks.defaultNetwork; + this.keepalive = false; + this._connectedPeers = {}; + this._addrs = []; + + this.on('peeraddr', function peerAddrEvent(peer, message) { + var addrs = message.addresses; + var length = addrs.length; + for (var i = 0; i < length; i++) { + var addr = addrs[i]; + // In case of an invalid time, assume "5 days ago" + if (addr.time <= 100000000 || addr.time > (now() + 10 * 60)) { + addr.time = now() - 5 * 24 * 60 * 60; + } + this._addAddr(addr); + } + }); + + this.on('seed', function seedEvent(ips) { + ips.forEach(function(ip) { + self._addAddr({ + ip: { + v4: ip + } + }); + }); + if (self.keepalive) { + self._fillConnections(); + } + }); + + this.on('peerdisconnect', function peerDisconnectEvent(peer, addr) { + self._deprioritizeAddr(addr); + self._removeConnectedPeer(addr); + if (self.keepalive) { + self._fillConnections(); + } + }); + + return this; + +} + +util.inherits(Pool, EventEmitter); + +Pool.MaxConnectedPeers = 8; +Pool.RetrySeconds = 30; +Pool.PeerEvents = ['version', 'inv', 'getdata', 'ping', 'ping', 'addr', + 'getaddr', 'verack', 'reject', 'alert', 'headers', 'block', + 'tx', 'getblocks', 'getheaders' +]; + + +/** + * Will initiatiate connection to peers, if available peers have been added to + * the pool, it will connect to those, otherwise will use DNS seeds to find + * peers to connect. When a peer disconnects it will add another. + */ +Pool.prototype.connect = function connect() { + this.keepalive = true; + var self = this; + if (self._addrs.length === 0) { + self._addAddrsFromSeeds(); + } else { + self._fillConnections(); + } + return this; +}; + + +/** + * Will disconnect all peers that are connected. + */ +Pool.prototype.disconnect = function disconnect() { + this.keepalive = false; + for (var i in this._connectedPeers) { + this._connectedPeers[i].disconnect(); + } + return this; +}; + +/** + * @returns {Number} The number of peers currently connected. + */ +Pool.prototype.numberConnected = function numberConnected() { + return Object.keys(this._connectedPeers).length; +}; + +/** + * Will fill the conneted peers to the maximum amount. + */ +Pool.prototype._fillConnections = function _fillConnections() { + var length = this._addrs.length; + for (var i = 0; i < length; i++) { + if (this.numberConnected() >= Pool.MaxConnectedPeers) { + break; + } + var addr = this._addrs[i]; + if (!addr.retryTime || now() > addr.retryTime) { + this._connectPeer(addr); + } + } + return this; +}; + +/** + * Will remove a peer from the list of connected peers. + * @param {Object} addr - An addr from the list of addrs + */ +Pool.prototype._removeConnectedPeer = function _removeConnectedPeer(addr) { + if (this._connectedPeers[addr.hash].status !== Peer.STATUS.DISCONNECTED) { + this._connectedPeers[addr.hash].disconnect(); + } else { + delete this._connectedPeers[addr.hash]; + } + return this; +}; + +/** + * Will connect a peer and add to the list of connected peers. + * @param {Object} addr - An addr from the list of addrs + */ +Pool.prototype._connectPeer = function _connectPeer(addr) { + var self = this; + + function addConnectedPeer(addr) { + var port = addr.port || self.network.port; + var ip = addr.ip.v4 || addr.ip.v6; + var peer = new Peer(ip, port, self.network); + peer.on('disconnect', function peerDisconnect() { + self.emit('peerdisconnect', peer, addr); + }); + peer.on('ready', function peerReady() { + self.emit('peerready', peer, addr); + }); + Pool.PeerEvents.forEach(function addPeerEvents(event) { + peer.on(event, function peerEvent(message) { + self.emit('peer' + event, peer, message); + }); + }); + peer.connect(); + self._connectedPeers[addr.hash] = peer; + } + + if (!this._connectedPeers[addr.hash]) { + addConnectedPeer(addr); + } + + return this; +}; + +/** + * Will deprioritize an addr in the list of addrs by moving it to the end + * of the array, and setting a retryTime + * @param {Object} addr - An addr from the list of addrs + */ +Pool.prototype._deprioritizeAddr = function _deprioritizeAddr(addr) { + for (var i = 0; i < this._addrs.length; i++) { + if (this._addrs[i].hash === addr.hash) { + var middle = this._addrs[i]; + middle.retryTime = now() + Pool.RetrySeconds; + var beginning = this._addrs.splice(0, i); + var end = this._addrs.splice(i + 1, this._addrs.length); + var combined = beginning.concat(end); + this._addrs = combined.concat([middle]); + } + } + return this; +}; + +/** + * Will add an addr to the beginning of the addrs array + * @param {Object} + */ +Pool.prototype._addAddr = function _addAddr(addr) { + + // make a unique key + addr.hash = sha256(new Buffer(addr.ip.v6 + addr.ip.v4 + addr.port)).toString('hex'); + + var length = this._addrs.length; + var exists = false; + for (var i = 0; i < length; i++) { + if (this._addrs[i].hash === addr.hash) { + exists = true; + } + } + if (!exists) { + this._addrs.unshift(addr); + } + return this; +}; + +/** + * Will add addrs to the list of addrs from a DNS seed + * @param {String} seed - A domain name to resolve known peers + * @param {Function} done + */ +Pool.prototype._addAddrsFromSeed = function _addAddrsFromSeed(seed) { + var self = this; + dns.resolve(seed, function(err, ips) { + if (err) { + self.emit('seederror', err); + return; + } + if (!ips || !ips.length) { + self.emit('seederror', new Error('No IPs found from seed lookup.')); + return; + } + // announce to pool + self.emit('seed', ips); + }); + return this; +}; + +/** + * Will add addrs to the list of addrs from network DNS seeds + * @param {Function} done + */ +Pool.prototype._addAddrsFromSeeds = function _addAddrsFromSeeds() { + var self = this; + var seeds = this.network.dnsSeeds; + seeds.forEach(function(seed) { + self._addAddrsFromSeed(seed); + }); + return this; +}; + +/** + * @returns {String} A string formatted for the console + */ +Pool.prototype.inspect = function inspect() { + return ''; +}; + +module.exports = Pool; diff --git a/package.json b/package.json index bfe4649..3dc1ac2 100644 --- a/package.json +++ b/package.json @@ -98,6 +98,7 @@ "gulp-shell": "^0.2.10", "mocha": "~2.0.1", "run-sequence": "^1.0.2", + "sinon": "^1.12.2", "karma": "^0.12.28", "karma-firefox-launcher": "^0.1.3", "karma-mocha": "^0.1.9" diff --git a/test/transport/peer.js b/test/transport/peer.js index 9733a22..066ee7a 100644 --- a/test/transport/peer.js +++ b/test/transport/peer.js @@ -49,21 +49,27 @@ describe('Peer', function() { peer.port.should.equal(8111); }); - // only for node TODO: (yemel) - it.skip('should be able to set a proxy', function() { - var peer, peer2, socket; + if (typeof(window) === 'undefined'){ - peer = new Peer('localhost'); - expect(peer.proxy).to.be.undefined(); - socket = peer._getSocket(); - socket.should.be.instanceof(Net.Socket); + // Node.js Tests - peer2 = peer.setProxy('127.0.0.1', 9050); - peer2.proxy.host.should.equal('127.0.0.1'); - peer2.proxy.port.should.equal(9050); - socket = peer2._getSocket(); - socket.should.be.instanceof(Socks5Client); + it('should be able to set a proxy', function() { + var peer, peer2, socket; + + peer = new Peer('localhost'); + expect(peer.proxy).to.be.undefined(); + socket = peer._getSocket(); + socket.should.be.instanceof(Net.Socket); + + peer2 = peer.setProxy('127.0.0.1', 9050); + peer2.proxy.host.should.equal('127.0.0.1'); + peer2.proxy.port.should.equal(9050); + socket = peer2._getSocket(); + socket.should.be.instanceof(Socks5Client); + + peer.should.equal(peer2); + }); + + } - peer.should.equal(peer2); - }); }); diff --git a/test/transport/pool.js b/test/transport/pool.js new file mode 100644 index 0000000..985723b --- /dev/null +++ b/test/transport/pool.js @@ -0,0 +1,98 @@ +'use strict'; + +if (typeof(window) === 'undefined'){ + + // Node.js Tests + + var chai = require('chai'); + + /* jshint unused: false */ + var should = chai.should(); + var expect = chai.expect; + + var dns = require('dns'); + var sinon = require('sinon'); + + var bitcore = require('../..'); + var Peer = bitcore.transport.Peer; + var MessagesData = require('../data/messages'); + var Messages = bitcore.transport.Messages; + var Pool = bitcore.transport.Pool; + var Networks = bitcore.Networks; + + describe('Pool', function() { + + it('should be able to create instance', function() { + var pool = new Pool(); + pool.network.should.equal(Networks.livenet); + }); + + it('should be able to create instance setting the network', function() { + var pool = new Peer(Networks.testnet); + pool.network.should.equal(Networks.livenet); + }); + + it('should discover peers via dns', function() { + var stub = sinon.stub(dns, 'resolve', function(seed, callback){ + callback(null, ['10.10.10.1', '10.10.10.2', '10.10.10.3']); + }); + var pool = new Pool(Networks.livenet); + pool.connect(); + pool.disconnect(); + pool._addrs.length.should.equal(3); + stub.restore(); + }); + + it('should not discover peers via dns', function() { + var pool = new Pool(); + pool._addAddr({ip: {v4: '10.10.10.1'}}); + pool.connect(); + pool.disconnect(); + pool._addrs.length.should.equal(1); + }); + + it('should add new addrs as they are announced over the network', function(done) { + + // only emit an event, no need to connect + var peerConnectStub = sinon.stub(Peer.prototype, 'connect', function(){ + this._readMessage(); + this.emit('ready'); + }); + + // mock a addr peer event + var peerMessageStub = sinon.stub(Peer.prototype, '_readMessage', function(){ + var payload = new Buffer(MessagesData.ADDR.payload, 'hex'); + var message = new Messages.Addresses().fromBuffer(payload); + this.emit(message.command, message); + }); + + var pool = new Pool(); + + pool._addAddr({ip: {v4: 'localhost'}}); + + // listen for the event + pool.on('peeraddr', function(peer, message) { + pool._addrs.length.should.equal(502); + + // restore stubs + peerConnectStub.restore(); + peerMessageStub.restore(); + + for (var i = 0; i < pool._addrs.length; i++) { + should.exist(pool._addrs[i].hash); + should.exist(pool._addrs[i].ip); + should.exist(pool._addrs[i].ip.v4); + } + + // done + done(); + }); + + pool.connect(); + + }); + + + }); + +}