From 7756fc2c739c66c8e2db06a30a6c813a19e5d00d Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Tue, 24 May 2016 11:36:35 -0700 Subject: [PATCH] ensure packets are served in order. --- lib/bcoin/peer.js | 122 +++++++++++++++++++++++++++++++--------------- 1 file changed, 83 insertions(+), 39 deletions(-) diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 165088ff..c3dd033f 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -85,6 +85,7 @@ function Peer(pool, options) { this.network = this.chain.network; this.parser = new bcoin.protocol.parser({ network: this.network }); this.framer = new bcoin.protocol.framer({ network: this.network }); + this.locker = new bcoin.locker(this); this.version = null; this.destroyed = false; this.ack = false; @@ -814,7 +815,7 @@ Peer.prototype.getUTXOs = function getUTXOs(utxos, callback) { }); }; -Peer.prototype._getUTXOs = function getUTXOs(utxos, callback) { +Peer.prototype._getUTXOs = function _getUTXOs(utxos, callback) { var index = 0; var i, prevout, coin; @@ -851,11 +852,23 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { var coins = []; var hits = []; - if (this.pool.options.selfish) + var unlock = this.locker.lock(_handleGetUTXOs, [payload]); + if (!unlock) return; + function done(err) { + if (err) { + self.emit('error', err); + return unlock(); + } + unlock(); + } + + if (this.pool.options.selfish) + return done(); + if (this.chain.db.options.spv) - return; + return done(); function checkMempool(hash, index, callback) { if (!self.mempool) @@ -878,7 +891,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { } if (payload.prevout.length > 15) - return; + return done(); utils.forEachSerial(payload.prevout, function(prevout, next, i) { var hash = prevout.hash; @@ -921,7 +934,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { }); }, function(err) { if (err) - self.emit('error', err); + return done(err); self.write(self.framer.UTXOs({ height: self.chain.height, @@ -929,6 +942,8 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { hits: hits, coins: coins })); + + done(); }); }; @@ -936,14 +951,27 @@ Peer.prototype._handleGetHeaders = function _handleGetHeaders(payload) { var self = this; var headers = []; - if (this.pool.options.selfish) + var unlock = this.locker.lock(_handleGetHeaders, [payload]); + if (!unlock) return; + function done(err) { + if (err) { + self.emit('error', err); + return unlock(); + } + self.sendHeaders(headers); + unlock(); + } + + if (this.pool.options.selfish) + return done(); + if (this.chain.db.options.spv) - return; + return done(); if (this.chain.db.options.prune) - return; + return done(); function collect(err, hash) { if (err) @@ -979,13 +1007,6 @@ Peer.prototype._handleGetHeaders = function _handleGetHeaders(payload) { }); } - function done(err) { - if (err) - return self.emit('error', err); - - self.sendHeaders(headers); - } - if (!payload.locator) return collect(null, payload.stop); @@ -1004,21 +1025,28 @@ Peer.prototype._handleGetBlocks = function _handleGetBlocks(payload) { var self = this; var blocks = []; - if (this.pool.options.selfish) - return; - - if (this.chain.db.options.spv) - return; - - if (this.chain.db.options.prune) + var unlock = this.locker.lock(_handleGetBlocks, [payload]); + if (!unlock) return; function done(err) { - if (err) - return self.emit('error', err); + if (err) { + self.emit('error', err); + return unlock(); + } self.sendInv(blocks); + unlock(); } + if (this.pool.options.selfish) + return done(); + + if (this.chain.db.options.spv) + return done(); + + if (this.chain.db.options.prune) + return done(); + this.chain.findLocator(payload.locator, function(err, tip) { if (err) return done(err); @@ -1050,7 +1078,7 @@ Peer.prototype._handleGetBlocks = function _handleGetBlocks(payload) { }); }; -Peer.prototype._handleVersion = function handleVersion(payload) { +Peer.prototype._handleVersion = function _handleVersion(payload) { var version = payload.version; var services = payload.services; @@ -1139,14 +1167,28 @@ Peer.prototype._handleMempool = function _handleMempool() { }); }; -Peer.prototype._handleGetData = function handleGetData(items) { +Peer.prototype._handleGetData = function _handleGetData(items) { var self = this; var check = []; var notfound = []; var i, item, entry, witness; - if (items.length > 50000) - return this._error('message getdata size() = %d', items.length); + var unlock = this.locker.lock(_handleGetdata, [items]); + if (!unlock) + return; + + function done(err) { + if (err) { + self.emit('error', err); + return unlock(); + } + unlock(); + } + + if (items.length > 50000) { + this._error('getdata size too large (%s).', items.length); + return done(); + } for (i = 0; i < items.length; i++) { item = items[i]; @@ -1177,7 +1219,7 @@ Peer.prototype._handleGetData = function handleGetData(items) { } if (this.pool.options.selfish) - return; + return done(); utils.forEachSerial(check, function(item, next) { var type = item.type & ~constants.WITNESS_MASK; @@ -1231,7 +1273,7 @@ Peer.prototype._handleGetData = function handleGetData(items) { } if (self.chain.db.options.prune) { notfound.push({ type: constants.inv.BLOCK, hash: hash }); - return; + return next(); } return self.chain.db.getBlock(hash, function(err, block) { if (err) @@ -1267,7 +1309,7 @@ Peer.prototype._handleGetData = function handleGetData(items) { } if (self.chain.db.options.prune) { notfound.push({ type: constants.inv.BLOCK, hash: hash }); - return; + return next(); } return self.chain.db.getBlock(hash, function(err, block) { if (err) @@ -1319,10 +1361,12 @@ Peer.prototype._handleGetData = function handleGetData(items) { if (notfound.length > 0) self.write(self.framer.notFound(notfound)); + + done(); }); }; -Peer.prototype._handleAddr = function handleAddr(addrs) { +Peer.prototype._handleAddr = function _handleAddr(addrs) { var now = utils.now(); var i, addr, ts; @@ -1353,12 +1397,12 @@ Peer.prototype._handleAddr = function handleAddr(addrs) { this.hostname); }; -Peer.prototype._handlePing = function handlePing(data) { +Peer.prototype._handlePing = function _handlePing(data) { this.write(this.framer.pong(data)); this.fire('ping', this.minPing); }; -Peer.prototype._handlePong = function handlePong(data) { +Peer.prototype._handlePong = function _handlePong(data) { var now = utils.ms(); if (!this.challenge) { @@ -1390,7 +1434,7 @@ Peer.prototype._handlePong = function handlePong(data) { this.fire('pong', this.minPing); }; -Peer.prototype._handleGetAddr = function handleGetAddr() { +Peer.prototype._handleGetAddr = function _handleGetAddr() { var hosts = {}; var items = []; var ts = utils.now() - (process.uptime() | 0); @@ -1436,7 +1480,7 @@ Peer.prototype._handleGetAddr = function handleGetAddr() { return this.write(this.framer.addr(items)); }; -Peer.prototype._handleInv = function handleInv(items) { +Peer.prototype._handleInv = function _handleInv(items) { var blocks = []; var txs = []; var i, item, unknown; @@ -1466,7 +1510,7 @@ Peer.prototype._handleInv = function handleInv(items) { bcoin.debug('Peer sent an unknown inv type: %d (%s).', unknown); }; -Peer.prototype._handleHeaders = function handleHeaders(headers) { +Peer.prototype._handleHeaders = function _handleHeaders(headers) { headers = headers.map(function(header) { return new bcoin.headers(header); }); @@ -1474,7 +1518,7 @@ Peer.prototype._handleHeaders = function handleHeaders(headers) { this.fire('headers', headers); }; -Peer.prototype._handleReject = function handleReject(payload) { +Peer.prototype._handleReject = function _handleReject(payload) { var hash, entry; this.fire('reject', payload); @@ -1491,7 +1535,7 @@ Peer.prototype._handleReject = function handleReject(payload) { entry.reject(this); }; -Peer.prototype._handleAlert = function handleAlert(details) { +Peer.prototype._handleAlert = function _handleAlert(details) { var hash = utils.dsha256(details.payload); var signature = details.signature;