From 8c7279518f5341a2482a79ac98f0574468541edc Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Fri, 30 Jun 2017 03:03:39 -0700 Subject: [PATCH] refactor: de-globalize workerpool. --- lib/blockchain/chain.js | 19 ++++++---- lib/http/rpc.js | 3 +- lib/mempool/mempool.js | 12 +++++-- lib/mining/cpuminer.js | 7 ++-- lib/mining/miner.js | 8 +++++ lib/node/fullnode.js | 3 ++ lib/node/node.js | 74 +++++++++++++++++---------------------- lib/primitives/mtx.js | 22 ++++++++---- lib/primitives/tx.js | 19 +++++++--- lib/script/sigcache.js | 2 +- lib/wallet/plugin.js | 1 + lib/wallet/server.js | 29 ++++++++++++++- lib/wallet/wallet.js | 3 +- lib/workers/index.js | 3 +- lib/workers/workerpool.js | 33 ++--------------- 15 files changed, 138 insertions(+), 100 deletions(-) diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index b639294d..bdac3a17 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -69,6 +69,7 @@ function Chain(options) { this.network = this.options.network; this.logger = this.options.logger.context('chain'); + this.workers = this.options.workers; this.checkpoints = this.options.checkpoints; this.locker = new Lock(true); @@ -613,7 +614,7 @@ Chain.prototype.verifyInputs = async function verifyInputs(block, prev, state) { } // Push onto verification queue. - jobs.push(tx.verifyAsync(view, state.flags)); + jobs.push(tx.verifyAsync(view, state.flags, this.workers)); } // Add new coins. @@ -698,11 +699,10 @@ Chain.prototype.findFork = async function findFork(fork, longer) { * @method * @private * @param {ChainEntry} competitor - The competing chain's tip. - * @param {Block} block - The being being added. * @returns {Promise} */ -Chain.prototype.reorganize = async function reorganize(competitor, block) { +Chain.prototype.reorganize = async function reorganize(competitor) { let tip = this.tip; let fork = await this.findFork(tip, competitor); let disconnect = []; @@ -758,11 +758,10 @@ Chain.prototype.reorganize = async function reorganize(competitor, block) { * @method * @private * @param {ChainEntry} competitor - The competing chain's tip. - * @param {Block} block - The being being added. * @returns {Promise} */ -Chain.prototype.reorganizeSPV = async function reorganizeSPV(competitor, block) { +Chain.prototype.reorganizeSPV = async function reorganizeSPV(competitor) { let tip = this.tip; let fork = await this.findFork(tip, competitor); let disconnect = []; @@ -911,9 +910,9 @@ Chain.prototype.setBestChain = async function setBestChain(entry, block, prev, f // In spv-mode, we reset the // chain and redownload the blocks. if (this.options.spv) - return await this.reorganizeSPV(entry, block); + return await this.reorganizeSPV(entry); - await this.reorganize(entry, block); + await this.reorganize(entry); } // Warn of unknown versionbits. @@ -2342,6 +2341,7 @@ function ChainOptions(options) { this.network = Network.primary; this.logger = Logger.global; + this.workers = null; this.prefix = null; this.location = null; @@ -2383,6 +2383,11 @@ ChainOptions.prototype.fromOptions = function fromOptions(options) { this.logger = options.logger; } + if (options.workers != null) { + assert(typeof options.workers === 'object'); + this.workers = options.workers; + } + if (options.spv != null) { assert(typeof options.spv === 'boolean'); this.spv = options.spv; diff --git a/lib/http/rpc.js b/lib/http/rpc.js index eb30b5f4..97e87d87 100644 --- a/lib/http/rpc.js +++ b/lib/http/rpc.js @@ -53,6 +53,7 @@ function RPC(node) { this.node = node; this.network = node.network; + this.workers = node.workers; this.chain = node.chain; this.mempool = node.mempool; this.pool = node.pool; @@ -1906,7 +1907,7 @@ RPC.prototype.signRawTransaction = async function signRawTransaction(args, help) } } - await tx.signAsync(keys, type); + await tx.signAsync(keys, type, this.workers); return { hex: tx.toRaw().toString('hex'), diff --git a/lib/mempool/mempool.js b/lib/mempool/mempool.js index 69b7e21c..c38fe78e 100644 --- a/lib/mempool/mempool.js +++ b/lib/mempool/mempool.js @@ -71,6 +71,7 @@ function Mempool(options) { this.network = this.options.network; this.logger = this.options.logger.context('mempool'); + this.workers = this.options.workers; this.chain = this.options.chain; this.fees = this.options.fees; @@ -1027,13 +1028,13 @@ Mempool.prototype.verifyResult = async function verifyResult(tx, view, flags) { */ Mempool.prototype.verifyInputs = async function verifyInputs(tx, view, flags) { - if (await tx.verifyAsync(view, flags)) + if (await tx.verifyAsync(view, flags, this.workers)) return; if (flags & Script.flags.ONLY_STANDARD_VERIFY_FLAGS) { flags &= ~Script.flags.ONLY_STANDARD_VERIFY_FLAGS; - if (await tx.verifyAsync(view, flags)) { + if (await tx.verifyAsync(view, flags, this.workers)) { throw new VerifyError(tx, 'nonstandard', 'non-mandatory-script-verify-flag', @@ -1923,6 +1924,7 @@ function MempoolOptions(options) { this.network = Network.primary; this.chain = null; this.logger = null; + this.workers = null; this.fees = null; this.limitFree = true; @@ -1968,6 +1970,7 @@ MempoolOptions.prototype.fromOptions = function fromOptions(options) { this.chain = options.chain; this.network = options.chain.network; this.logger = options.chain.logger; + this.workers = options.chain.workers; this.requireStandard = this.network.requireStandard; this.minRelay = this.network.minRelay; @@ -1977,6 +1980,11 @@ MempoolOptions.prototype.fromOptions = function fromOptions(options) { this.logger = options.logger; } + if (options.workers != null) { + assert(typeof options.workers === 'object'); + this.workers = options.workers; + } + if (options.fees != null) { assert(typeof options.fees === 'object'); this.fees = options.fees; diff --git a/lib/mining/cpuminer.js b/lib/mining/cpuminer.js index 6c27eaf1..6c30bbf6 100644 --- a/lib/mining/cpuminer.js +++ b/lib/mining/cpuminer.js @@ -11,7 +11,6 @@ const assert = require('assert'); const util = require('../utils/util'); const co = require('../utils/co'); const AsyncObject = require('../utils/asyncobject'); -const workerPool = require('../workers/workerpool').pool; const mine = require('./mine'); const Lock = require('../utils/lock'); @@ -33,6 +32,7 @@ function CPUMiner(miner) { this.miner = miner; this.network = this.miner.network; this.logger = this.miner.logger.context('cpuminer'); + this.workers = this.miner.workers; this.chain = this.miner.chain; this.locker = new Lock(); @@ -329,8 +329,11 @@ CPUMiner.prototype.findNonceAsync = async function findNonceAsync(job) { let max = interval; let nonce; + if (!this.workers) + return this.findNonce(job); + while (max <= 0xffffffff) { - nonce = await workerPool.mine(data, target, min, max); + nonce = await this.workers.mine(data, target, min, max); if (nonce !== -1) break; diff --git a/lib/mining/miner.js b/lib/mining/miner.js index 2c56fd38..88aae366 100644 --- a/lib/mining/miner.js +++ b/lib/mining/miner.js @@ -36,6 +36,7 @@ function Miner(options) { this.options = new MinerOptions(options); this.network = this.options.network; this.logger = this.options.logger.context('miner'); + this.workers = null; this.chain = this.options.chain; this.mempool = this.options.mempool; this.addresses = this.options.addresses; @@ -378,6 +379,7 @@ function MinerOptions(options) { this.network = Network.primary; this.logger = null; + this.workers = null; this.chain = null; this.mempool = null; @@ -412,12 +414,18 @@ MinerOptions.prototype.fromOptions = function fromOptions(options) { this.chain = options.chain; this.network = options.chain.network; this.logger = options.chain.logger; + this.workers = options.chain.workers; if (options.logger != null) { assert(typeof options.logger === 'object'); this.logger = options.logger; } + if (options.workers != null) { + assert(typeof options.workers === 'object'); + this.workers = options.workers; + } + if (options.mempool != null) { assert(typeof options.mempool === 'object'); this.mempool = options.mempool; diff --git a/lib/node/fullnode.js b/lib/node/fullnode.js index f6e391fb..7ce70dd0 100644 --- a/lib/node/fullnode.js +++ b/lib/node/fullnode.js @@ -51,6 +51,7 @@ function FullNode(options) { this.chain = new Chain({ network: this.network, logger: this.logger, + workers: this.workers, db: this.config.str('db'), prefix: this.config.prefix, maxFiles: this.config.num('max-files'), @@ -73,6 +74,7 @@ function FullNode(options) { this.mempool = new Mempool({ network: this.network, logger: this.logger, + workers: this.workers, chain: this.chain, fees: this.fees, db: this.config.str('db'), @@ -120,6 +122,7 @@ function FullNode(options) { this.miner = new Miner({ network: this.network, logger: this.logger, + workers: this.workers, chain: this.chain, mempool: this.mempool, address: this.config.array('coinbase-address'), diff --git a/lib/node/node.js b/lib/node/node.js index 7d0f7e04..73434024 100644 --- a/lib/node/node.js +++ b/lib/node/node.js @@ -12,7 +12,7 @@ const AsyncObject = require('../utils/asyncobject'); const util = require('../utils/util'); const Network = require('../protocol/network'); const Logger = require('./logger'); -const workerPool = require('../workers/workerpool').pool; +const WorkerPool = require('../workers/workerpool'); const secp256k1 = require('../crypto/secp256k1'); const native = require('../native'); const Config = require('./config'); @@ -47,6 +47,7 @@ function Node(options) { this.spv = false; this.logger = null; + this.workers = null; this.chain = null; this.fees = null; this.mempool = null; @@ -82,6 +83,12 @@ Node.prototype.initOptions = function initOptions() { }); this.logger = logger.context('node'); + + this.workers = new WorkerPool({ + enabled: config.str('workers-enabled'), + size: config.num('workers-size'), + timeout: config.num('workers-timeout') + }); }; /** @@ -95,6 +102,26 @@ Node.prototype.init = function init() { this.on('error', () => {}); + this.workers.on('spawn', (child) => { + this.logger.info('Spawning worker process: %d.', child.id); + }); + + this.workers.on('exit', (code, child) => { + this.logger.warning('Worker %d exited: %s.', child.id, code); + }); + + this.workers.on('log', (log) => { + this.logger.debug(log); + }); + + this.workers.on('error', (err, child) => { + if (child) { + this.logger.error('Worker %d error: %s', child.id, err.message); + return; + } + this.emit('error', err); + }); + this.hook('preopen', () => { this.handlePreopen(); }); @@ -153,22 +180,6 @@ Node.prototype.handlePreopen = async function handlePreopen() { this.logger.warning('Adjusted time mismatch!'); this.logger.warning('Please make sure your system clock is correct!'); }); - - this.bind(workerPool, 'spawn', (child) => { - this.logger.info('Spawning worker process: %d.', child.id); - }); - - this.bind(workerPool, 'exit', (code, child) => { - this.logger.warning('Worker %d exited: %s.', child.id, code); - }); - - this.bind(workerPool, 'error', (err, child) => { - if (child) { - this.logger.error('Worker %d error: %s', child.id, err.message); - return; - } - this.emit('error', err); - }); }; /** @@ -189,7 +200,7 @@ Node.prototype.handleOpen = async function handleOpen() { this.logger.warning('Hashing will be slow.'); } - if (!workerPool.enabled) { + if (!this.workers.enabled) { this.logger.warning('Warning: worker pool is disabled.'); this.logger.warning('Verification will be slow.'); } @@ -209,12 +220,11 @@ Node.prototype.handlePreclose = async function handlePreclose() { */ Node.prototype.handleClose = async function handleClose() { - this.startTime = -1; - - for (let bound of this.bound) - bound[0].removeListener(bound[1], bound[2]); + for (let [obj, event, listener] of this.bound) + obj.removeListener(event, listener); this.bound.length = 0; + this.startTime = -1; await this.logger.close(); }; @@ -239,25 +249,7 @@ Node.prototype.bind = function bind(obj, event, listener) { */ Node.prototype.error = function error(err) { - if (!err) - return; - - if (err.type === 'VerifyError') { - switch (err.reason) { - case 'insufficient priority': - case 'non-final': - this.logger.spam(err.message); - break; - default: - this.logger.error(err.message); - break; - } - } else if (typeof err.code === 'string' && err.code[0] === 'E') { - this.logger.error(err.message); - } else { - this.logger.error(err); - } - + this.logger.error(err); this.emit('error', err); }; diff --git a/lib/primitives/mtx.js b/lib/primitives/mtx.js index 89320caf..0e7f4295 100644 --- a/lib/primitives/mtx.js +++ b/lib/primitives/mtx.js @@ -17,7 +17,6 @@ const Coin = require('./coin'); const Outpoint = require('./outpoint'); const CoinView = require('../coins/coinview'); const Address = require('./address'); -const workerPool = require('../workers/workerpool').pool; const encoding = require('../utils/encoding'); const consensus = require('../protocol/consensus'); const policy = require('../protocol/policy'); @@ -257,11 +256,12 @@ MTX.prototype.verify = function verify(flags) { * Verify the transaction inputs on the worker pool * (if workers are enabled). * @param {VerifyFlags?} [flags=STANDARD_VERIFY_FLAGS] + * @param {WorkerPool?} pool * @returns {Promise} */ -MTX.prototype.verifyAsync = function verifyAsync(flags) { - return TX.prototype.verifyAsync.call(this, this.view, flags); +MTX.prototype.verifyAsync = function verifyAsync(flags, pool) { + return TX.prototype.verifyAsync.call(this, this.view, flags, pool); }; /** @@ -561,11 +561,15 @@ MTX.prototype.scriptVector = function scriptVector(prev, vector, ring) { * @param {Coin|Output} coin * @param {KeyRing} ring * @param {SighashType?} type + * @param {WorkerPool?} pool * @returns {Promise} */ -MTX.prototype.signInputAsync = function signInputAsync(index, coin, ring, type) { - return workerPool.signInput(this, index, coin, ring, type); +MTX.prototype.signInputAsync = function signInputAsync(index, coin, ring, type, pool) { + if (!pool) + return this.signInput(index, coin, ring, type); + + return pool.signInput(this, index, coin, ring, type, pool); }; /** @@ -974,11 +978,15 @@ MTX.prototype.sign = function sign(ring, type) { * (if workers are enabled). * @param {KeyRing} ring * @param {SighashType?} type + * @param {WorkerPool?} pool * @returns {Promise} */ -MTX.prototype.signAsync = function signAsync(ring, type) { - return workerPool.sign(this, ring, type); +MTX.prototype.signAsync = function signAsync(ring, type, pool) { + if (!pool) + return this.sign(ring, type); + + return pool.sign(this, ring, type); }; /** diff --git a/lib/primitives/tx.js b/lib/primitives/tx.js index 682c18ff..66164075 100644 --- a/lib/primitives/tx.js +++ b/lib/primitives/tx.js @@ -21,7 +21,6 @@ const Input = require('./input'); const Output = require('./output'); const Outpoint = require('./outpoint'); const InvItem = require('./invitem'); -const workerPool = require('../workers/workerpool').pool; const Bloom = require('../utils/bloom'); const consensus = require('../protocol/consensus'); const policy = require('../protocol/policy'); @@ -845,17 +844,21 @@ TX.prototype.checkInput = function checkInput(index, coin, flags) { * (if workers are enabled). * @param {CoinView} view * @param {VerifyFlags?} [flags=STANDARD_VERIFY_FLAGS] + * @param {WorkerPool?} pool * @returns {Promise} */ -TX.prototype.verifyAsync = async function verifyAsync(view, flags) { +TX.prototype.verifyAsync = async function verifyAsync(view, flags, pool) { if (this.inputs.length === 0) return false; if (this.isCoinbase()) return true; - return await workerPool.verify(this, view, flags); + if (!pool) + return this.verify(view, flags); + + return await pool.verify(this, view, flags); }; /** @@ -864,13 +867,19 @@ TX.prototype.verifyAsync = async function verifyAsync(view, flags) { * verified. * @param {Coin|Output} coin - Previous output. * @param {VerifyFlags} [flags=STANDARD_VERIFY_FLAGS] + * @param {WorkerPool?} pool * @returns {Promise} */ -TX.prototype.verifyInputAsync = async function verifyInputAsync(index, coin, flags) { +TX.prototype.verifyInputAsync = async function verifyInputAsync(index, coin, flags, pool) { let input = this.inputs[index]; + assert(input, 'Input does not exist.'); - return await workerPool.verifyInput(this, index, coin, flags); + + if (!pool) + return this.verifyInput(index, coin, flags); + + return await pool.verifyInput(this, index, coin, flags); }; /** diff --git a/lib/script/sigcache.js b/lib/script/sigcache.js index 9387ae5c..61490c7a 100644 --- a/lib/script/sigcache.js +++ b/lib/script/sigcache.js @@ -150,4 +150,4 @@ SigCacheEntry.prototype.equal = function equal(sig, key) { * Expose */ -module.exports = new SigCache(+process.env.BCOIN_SIGCACHE_SIZE || 0); +module.exports = SigCache; diff --git a/lib/wallet/plugin.js b/lib/wallet/plugin.js index 5cdff00d..9e7ef27b 100644 --- a/lib/wallet/plugin.js +++ b/lib/wallet/plugin.js @@ -36,6 +36,7 @@ plugin.init = function init(node) { wdb = new WalletDB({ network: node.network, logger: node.logger, + workers: node.workers, client: client, prefix: config.prefix, db: config.str(['wallet-db', 'db']), diff --git a/lib/wallet/server.js b/lib/wallet/server.js index b44f6836..9f7fd318 100644 --- a/lib/wallet/server.js +++ b/lib/wallet/server.js @@ -26,7 +26,7 @@ const server = exports; server.create = function create(options) { let config = new Config('bcoin'); let logger = new Logger('debug'); - let client, wdb; + let client, wdb, workers; config.inject(options); config.load(options); @@ -52,9 +52,36 @@ server.create = function create(options) { shrink: config.bool('log-shrink') }); + workers = new WorkerPool({ + enabled: config.str('workers-enabled'), + size: config.num('workers-size'), + timeout: config.num('workers-timeout') + }); + + workers.on('spawn', (child) => { + logger.info('Spawning worker process: %d.', child.id); + }); + + workers.on('exit', (code, child) => { + logger.warning('Worker %d exited: %s.', child.id, code); + }); + + workers.on('log', (log) => { + logger.debug(log); + }); + + workers.on('error', (err, child) => { + if (child) { + logger.error('Worker %d error: %s', child.id, err.message); + return; + } + wdb.emit('error', err); + }); + wdb = new WalletDB({ network: config.network, logger: logger, + workers: workers, client: client, prefix: config.prefix, db: config.str('db'), diff --git a/lib/wallet/wallet.js b/lib/wallet/wallet.js index 67a62564..5dbffb71 100644 --- a/lib/wallet/wallet.js +++ b/lib/wallet/wallet.js @@ -24,6 +24,7 @@ const Path = require('./path'); const common = require('./common'); const Address = require('../primitives/address'); const MTX = require('../primitives/mtx'); +const Script = require('../script/script'); const WalletKey = require('./walletkey'); const HD = require('../hd/hd'); const Output = require('../primitives/output'); @@ -2071,7 +2072,7 @@ Wallet.prototype.sign = async function sign(mtx, passphrase) { rings = await this.deriveInputs(mtx); - return await mtx.signAsync(rings); + return await mtx.signAsync(rings, Script.hashType.ALL, this.db.workers); }; /** diff --git a/lib/workers/index.js b/lib/workers/index.js index 98c0cc0a..21edc482 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -17,5 +17,4 @@ exports.packets = require('./packets'); exports.ParserClient = require('./parser-client'); exports.Parser = require('./parser'); // exports.worker = require('./worker'); -exports.Worker = require('./workerpool').Worker; -exports.WorkerPool = require('./workerpool').WorkerPool; +exports.WorkerPool = require('./workerpool'); diff --git a/lib/workers/workerpool.js b/lib/workers/workerpool.js index ec4c8e75..e62ce21f 100644 --- a/lib/workers/workerpool.js +++ b/lib/workers/workerpool.js @@ -49,8 +49,6 @@ function WorkerPool(options) { this.enabled = true; this.set(options); - - this.on('error', () => {}); } util.inherits(WorkerPool, EventEmitter); @@ -660,8 +658,8 @@ Worker.prototype.handlePacket = function handlePacket(packet) { this.emit('event', packet.items); break; case packets.types.LOG: - util.log('Worker %d:', this.id); - util.log(packet.text); + this.emit('log', 'Worker ' + this.id); + this.emit('log', packet.text); break; case packets.types.ERROR: this.emit('error', packet.error); @@ -897,33 +895,8 @@ function getCores() { return Math.max(2, os.cpus().length); } -/* - * Default Pool - */ - -exports.pool = new WorkerPool(); - -exports.set = function set(options) { - this.pool.set(options); -}; - -exports.enable = function enable() { - this.pool.enable(); -}; - -exports.disable = function disable() { - this.pool.disable(); -}; - -exports.set({ - enabled: +process.env.BCOIN_WORKERS_ENABLED !== 0, - size: +process.env.BCOIN_WORKERS_SIZE || null, - timeout: +process.env.BCOIN_WORKERS_TIMEOUT || null -}); - /* * Expose */ -exports.WorkerPool = WorkerPool; -exports.Worker = Worker; +module.exports = WorkerPool;