From 5023c7efe4816e61bb64d7ab919e19697167bcdc Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 23 Mar 2016 03:52:11 -0700 Subject: [PATCH] add locker. refactor. --- lib/bcoin.js | 1 + lib/bcoin/address.js | 2 - lib/bcoin/chain.js | 89 +++--------------------------- lib/bcoin/locker.js | 125 +++++++++++++++++++++++++++++++++++++++++++ lib/bcoin/mempool.js | 44 +++++++++------ lib/bcoin/pool.js | 11 ++-- lib/bcoin/utils.js | 2 +- lib/bcoin/workers.js | 6 +-- 8 files changed, 169 insertions(+), 111 deletions(-) create mode 100644 lib/bcoin/locker.js diff --git a/lib/bcoin.js b/lib/bcoin.js index f7829ebc..548ed0de 100644 --- a/lib/bcoin.js +++ b/lib/bcoin.js @@ -83,6 +83,7 @@ bcoin.debug = function debug() { bcoin.utils = utils; bcoin.utils.debug = bcoin.debug; bcoin.utils.ensurePrefix = bcoin.ensurePrefix; +bcoin.locker = require('./bcoin/locker'); bcoin.reader = require('./bcoin/reader'); bcoin.writer = require('./bcoin/writer'); bcoin.profiler = require('./bcoin/profiler'); diff --git a/lib/bcoin/address.js b/lib/bcoin/address.js index 416401d5..a899b740 100644 --- a/lib/bcoin/address.js +++ b/lib/bcoin/address.js @@ -558,7 +558,6 @@ Address.prototype.toJSON = function toJSON(passphrase) { key: key.toJSON(passphrase), type: this.type, witness: this.witness, - redeem: this.redeem ? utils.toHex(this.redeem) : null, keys: this.keys.map(utils.toBase58), m: this.m, n: this.n @@ -583,7 +582,6 @@ Address.fromJSON = function fromJSON(json, passphrase) { key: bcoin.keypair.fromJSON(json.key, passphrase), type: json.type, witness: json.witness, - redeem: json.redeem ? new Buffer(json.redeem, 'hex') : null, keys: json.keys.map(utils.fromBase58), m: json.m, n: json.n diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 04479c1c..9071269a 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -33,14 +33,10 @@ function Chain(node, options) { this.loaded = false; this.mempool = node.mempool; this.db = new bcoin.chaindb(this, options); - this.busy = false; - this.jobs = []; - this.pending = []; - this.pendingBlocks = {}; - this.pendingSize = 0; this.total = 0; - this.orphanLimit = options.orphanLimit || 20 * 1024 * 1024; - this.pendingLimit = options.pendingLimit || 20 * 1024 * 1024; + this.orphanLimit = options.orphanLimit || (20 << 20); + this.pendingLimit = options.pendingLimit || (20 << 20); + this.locker = new bcoin.locker(this, this.add, this.pendingLimit); this.invalid = {}; this.bestHeight = -1; this.lastUpdate = utils.now(); @@ -181,61 +177,7 @@ Chain.prototype.open = function open(callback) { }; Chain.prototype._lock = function _lock(func, args, force) { - var self = this; - var block, called; - - if (force) { - assert(this.busy); - return function unlock() { - assert(!called); - called = true; - }; - } - - if (this.busy) { - if (func === Chain.prototype.add) { - block = args[0]; - this.pending.push(block); - this.pendingBlocks[block.hash('hex')] = true; - this.pendingSize += block.getSize(); - if (this.pendingSize > this.pendingLimit) { - this.purgePending(); - return; - } - } - this.jobs.push([func, args]); - return; - } - - this.busy = true; - - return function unlock() { - var item, block; - - assert(!called); - called = true; - - self.busy = false; - - if (func === Chain.prototype.add) { - if (self.pending.length === 0) - self.emit('flush'); - } - - if (self.jobs.length === 0) - return; - - item = self.jobs.shift(); - - if (item[0] === Chain.prototype.add) { - block = item[1][0]; - assert(block === self.pending.shift()); - delete self.pendingBlocks[block.hash('hex')]; - self.pendingSize -= block.getSize(); - } - - item[0].apply(self, item[1]); - }; + return this.locker.lock(func, args, force); }; // Stream headers from electrum.org for quickly @@ -962,10 +904,7 @@ Chain.prototype.resetTime = function resetTime(ts, callback, force) { }; Chain.prototype.onFlush = function onFlush(callback) { - if (this.pending.length === 0) - return callback(); - - this.once('flush', callback); + return this.locker.onFlush(callback); }; Chain.prototype.add = function add(initial, peer, callback, force) { @@ -1300,21 +1239,7 @@ Chain.prototype.pruneOrphans = function pruneOrphans(peer) { }; Chain.prototype.purgePending = function purgePending() { - var self = this; - - utils.debug('Warning: %dmb of pending blocks. Purging.', - utils.mb(this.pendingSize)); - - this.pending.forEach(function(block) { - delete self.pendingBlocks[block.hash('hex')]; - }); - - this.pending.length = 0; - this.pendingSize = 0; - - this.jobs = this.jobs.filter(function(item) { - return item[0] !== Chain.prototype.add; - }); + return this.locker.purgePending(); }; Chain.prototype.has = function has(hash, callback) { @@ -1409,7 +1334,7 @@ Chain.prototype.hasPending = function hasPending(hash) { else if (hash.hash) hash = hash.hash('hex'); - return !!this.pendingBlocks[hash]; + return this.locker.hasPending(hash); }; Chain.prototype.getEntry = function getEntry(hash, callback) { diff --git a/lib/bcoin/locker.js b/lib/bcoin/locker.js new file mode 100644 index 00000000..a76e4ddf --- /dev/null +++ b/lib/bcoin/locker.js @@ -0,0 +1,125 @@ +/** + * locker.js - lock and queue for bcoin + * Copyright (c) 2014-2015, Fedor Indutny (MIT License) + * https://github.com/indutny/bcoin + */ + +var EventEmitter = require('events').EventEmitter; +var utils = require('./utils'); +var assert = utils.assert; + +/** + * Locker + */ + +function Locker(parent, add, limit) { + if (!(this instanceof Locker)) + return Locker(parent, add, limit); + + this.parent = parent; + this.jobs = []; + this.busy = false; + + this.pending = []; + this.pendingMap = {}; + this.pendingSize = 0; + this.pendingLimit = limit || (20 << 20); + this.add = add; +} + +utils.inherits(Locker, EventEmitter); + +Locker.prototype.hasPending = function hasPending(key) { + return this.pendingMap[key] === true; +}; + +Locker.prototype.lock = function lock(func, args, force) { + var self = this; + var obj, called; + + if (force) { + assert(this.busy); + return function unlock() { + assert(!called); + called = true; + }; + } + + if (this.busy) { + if (this.add && func === this.add) { + obj = args[0]; + this.pending.push(obj); + this.pendingMap[obj.hash('hex')] = true; + this.pendingSize += obj.getSize(); + if (this.pendingSize > this.pendingLimit) { + this.purgePending(); + return; + } + } + this.jobs.push([func, args]); + return; + } + + this.busy = true; + + return function unlock() { + var item, obj; + + assert(!called); + called = true; + + self.busy = false; + + if (self.add && func === self.add) { + if (self.pending.length === 0) + self.emit('flush'); + } + + if (self.jobs.length === 0) + return; + + item = self.jobs.shift(); + + if (self.add && item[0] === self.add) { + obj = item[1][0]; + assert(obj === self.pending.shift()); + delete self.pendingMap[obj.hash('hex')]; + self.pendingSize -= obj.getSize(); + } + + item[0].apply(self.parent, item[1]); + }; +}; + +Locker.prototype.purgePending = function purgePending() { + var self = this; + + assert(this.add); + + utils.debug('Warning: %dmb of pending objects. Purging.', + utils.mb(this.pendingSize)); + + this.pending.forEach(function(obj) { + delete self.pendingMap[obj.hash('hex')]; + }); + + this.pending.length = 0; + this.pendingSize = 0; + + this.jobs = this.jobs.filter(function(item) { + return item[0] !== self.add; + }); +}; + +Locker.prototype.onFlush = function onFlush(callback) { + if (this.pending.length === 0) + return callback(); + + this.once('flush', callback); +}; + +/** + * Expose + */ + +module.exports = Locker; diff --git a/lib/bcoin/mempool.js b/lib/bcoin/mempool.js index b9016647..bfaacbaa 100644 --- a/lib/bcoin/mempool.js +++ b/lib/bcoin/mempool.js @@ -13,6 +13,7 @@ var utils = require('./utils'); var assert = utils.assert; var BufferWriter = require('./writer'); var BufferReader = require('./reader'); +var DUMMY_PEER = { sendReject: function() {} }; /** * Mempool @@ -69,9 +70,12 @@ function Mempool(node, options) { utils.inherits(Mempool, EventEmitter); +Mempool.flags = constants.flags.STANDARD_VERIFY_FLAGS; +Mempool.mandatory = constants.flags.MANDATORY_VERIFY_FLAGS; + Mempool.prototype._lock = function _lock(func, args, force) { var self = this; - var block, called; + var tx, called; if (force) { assert(this.busy); @@ -223,9 +227,6 @@ Mempool.prototype.hasTX = function hasTX(hash, callback) { }); }; -Mempool.flags = constants.flags.STANDARD_VERIFY_FLAGS; -Mempool.mandatory = constants.flags.MANDATORY_VERIFY_FLAGS; - Mempool.prototype.add = Mempool.prototype.addTX = function addTX(tx, peer, callback, force) { var self = this; @@ -236,6 +237,14 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback, force) { if (!unlock) return; + if (typeof peer === 'function') { + callback = peer; + peer = null; + } + + if (!peer) + peer = DUMMY_PEER; + hash = tx.hash('hex'); assert(tx.ts === 0); @@ -321,18 +330,6 @@ Mempool.prototype.addUnchecked = function addUnchecked(tx, peer, callback) { }); }; -function VerifyError(reason, score) { - Error.call(this); - if (Error.captureStackTrace) - Error.captureStackTrace(this, VerifyError); - this.type = 'VerifyError'; - this.message = reason; - this.reason = score === -1 ? null : reason; - this.score = score; -} - -utils.inherits(VerifyError, Error); - Mempool.prototype.verify = function verify(tx, callback) { var self = this; var total, input, coin, i, fee, now; @@ -607,6 +604,9 @@ Mempool.prototype.checkTX = function checkTX(tx, peer) { var total = new bn(0); var uniq = {}; + if (!peer) + peer = DUMMY_PEER; + if (tx.inputs.length === 0) return peer.sendReject(tx, 'bad-txns-vin-empty', 100); @@ -649,6 +649,18 @@ Mempool.prototype.checkTX = function checkTX(tx, peer) { return true; }; +function VerifyError(reason, score) { + Error.call(this); + if (Error.captureStackTrace) + Error.captureStackTrace(this, VerifyError); + this.type = 'VerifyError'; + this.message = reason; + this.reason = score === -1 ? null : reason; + this.score = score; +} + +utils.inherits(VerifyError, Error); + /** * Expose */ diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index a23e3d5b..6ce6e667 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -754,9 +754,9 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { peer.queue.block.length, block.bits, self.peers.all.length, - self.chain.pending.length, + self.chain.locker.pending.length, self.chain.bestHeight, - self.chain.jobs.length); + self.chain.locker.jobs.length); } return callback(null, true); @@ -1145,10 +1145,7 @@ Pool.prototype.watch = function watch(id) { else this.watchMap[hid] = 1; - if (this.bloom.test(id, 'hex')) - return; - - this.bloom.add(id, 'hex'); + this.bloom.add(id); } // Send it to peers @@ -1528,7 +1525,7 @@ Pool.prototype.scheduleRequests = function scheduleRequests(peer) { Pool.prototype._sendRequests = function _sendRequests(peer) { var size, items; - if (this.chain.pending.length > 0) + if (this.chain.locker.pending.length > 0) return; if (peer.queue.block.length === 0) diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index 890a22fb..67c20f79 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -1915,6 +1915,6 @@ utils.SyncBatch = SyncBatch; if (utils.isBrowser) { bn.prototype.toBuffer = function toBuffer(order, size) { - return new Buffer(this.toArray(order, size)); + return this.toArrayLike(Buffer, order, size); }; } diff --git a/lib/bcoin/workers.js b/lib/bcoin/workers.js index 7fe31298..56a89e18 100644 --- a/lib/bcoin/workers.js +++ b/lib/bcoin/workers.js @@ -33,7 +33,7 @@ workers.spawn = function spawn(index) { child = cp.spawn(process.argv[0], [__filename], { stdio: ['pipe', 'pipe', 'inherit'], env: utils.merge({}, process.env, { - BCOIN_WORKER: index + '' + BCOIN_WORKER_ID: index + '' }) }); @@ -121,7 +121,7 @@ bcoin.tx.prototype.verifyAsync = function verifyAsync(index, force, flags, callb workers.listen = function listen() { bcoin.debug = function debug() { - process.stderr.write('Worker %s: ', process.env.BCOIN_WORKER); + process.stderr.write('Worker ' + process.env.BCOIN_WORKER_ID + ':'); return console.error.apply(console.error, arguments); }; @@ -372,5 +372,5 @@ function parser(onPacket) { }; } -if (process.env.BCOIN_WORKER) +if (process.env.BCOIN_WORKER_ID) workers.listen();