refactor: de-globalize workerpool.

This commit is contained in:
Christopher Jeffrey 2017-06-30 03:03:39 -07:00
parent c53f4cf89e
commit 8c7279518f
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
15 changed files with 138 additions and 100 deletions

View File

@ -69,6 +69,7 @@ function Chain(options) {
this.network = this.options.network;
this.logger = this.options.logger.context('chain');
this.workers = this.options.workers;
this.checkpoints = this.options.checkpoints;
this.locker = new Lock(true);
@ -613,7 +614,7 @@ Chain.prototype.verifyInputs = async function verifyInputs(block, prev, state) {
}
// Push onto verification queue.
jobs.push(tx.verifyAsync(view, state.flags));
jobs.push(tx.verifyAsync(view, state.flags, this.workers));
}
// Add new coins.
@ -698,11 +699,10 @@ Chain.prototype.findFork = async function findFork(fork, longer) {
* @method
* @private
* @param {ChainEntry} competitor - The competing chain's tip.
* @param {Block} block - The being being added.
* @returns {Promise}
*/
Chain.prototype.reorganize = async function reorganize(competitor, block) {
Chain.prototype.reorganize = async function reorganize(competitor) {
let tip = this.tip;
let fork = await this.findFork(tip, competitor);
let disconnect = [];
@ -758,11 +758,10 @@ Chain.prototype.reorganize = async function reorganize(competitor, block) {
* @method
* @private
* @param {ChainEntry} competitor - The competing chain's tip.
* @param {Block} block - The being being added.
* @returns {Promise}
*/
Chain.prototype.reorganizeSPV = async function reorganizeSPV(competitor, block) {
Chain.prototype.reorganizeSPV = async function reorganizeSPV(competitor) {
let tip = this.tip;
let fork = await this.findFork(tip, competitor);
let disconnect = [];
@ -911,9 +910,9 @@ Chain.prototype.setBestChain = async function setBestChain(entry, block, prev, f
// In spv-mode, we reset the
// chain and redownload the blocks.
if (this.options.spv)
return await this.reorganizeSPV(entry, block);
return await this.reorganizeSPV(entry);
await this.reorganize(entry, block);
await this.reorganize(entry);
}
// Warn of unknown versionbits.
@ -2342,6 +2341,7 @@ function ChainOptions(options) {
this.network = Network.primary;
this.logger = Logger.global;
this.workers = null;
this.prefix = null;
this.location = null;
@ -2383,6 +2383,11 @@ ChainOptions.prototype.fromOptions = function fromOptions(options) {
this.logger = options.logger;
}
if (options.workers != null) {
assert(typeof options.workers === 'object');
this.workers = options.workers;
}
if (options.spv != null) {
assert(typeof options.spv === 'boolean');
this.spv = options.spv;

View File

@ -53,6 +53,7 @@ function RPC(node) {
this.node = node;
this.network = node.network;
this.workers = node.workers;
this.chain = node.chain;
this.mempool = node.mempool;
this.pool = node.pool;
@ -1906,7 +1907,7 @@ RPC.prototype.signRawTransaction = async function signRawTransaction(args, help)
}
}
await tx.signAsync(keys, type);
await tx.signAsync(keys, type, this.workers);
return {
hex: tx.toRaw().toString('hex'),

View File

@ -71,6 +71,7 @@ function Mempool(options) {
this.network = this.options.network;
this.logger = this.options.logger.context('mempool');
this.workers = this.options.workers;
this.chain = this.options.chain;
this.fees = this.options.fees;
@ -1027,13 +1028,13 @@ Mempool.prototype.verifyResult = async function verifyResult(tx, view, flags) {
*/
Mempool.prototype.verifyInputs = async function verifyInputs(tx, view, flags) {
if (await tx.verifyAsync(view, flags))
if (await tx.verifyAsync(view, flags, this.workers))
return;
if (flags & Script.flags.ONLY_STANDARD_VERIFY_FLAGS) {
flags &= ~Script.flags.ONLY_STANDARD_VERIFY_FLAGS;
if (await tx.verifyAsync(view, flags)) {
if (await tx.verifyAsync(view, flags, this.workers)) {
throw new VerifyError(tx,
'nonstandard',
'non-mandatory-script-verify-flag',
@ -1923,6 +1924,7 @@ function MempoolOptions(options) {
this.network = Network.primary;
this.chain = null;
this.logger = null;
this.workers = null;
this.fees = null;
this.limitFree = true;
@ -1968,6 +1970,7 @@ MempoolOptions.prototype.fromOptions = function fromOptions(options) {
this.chain = options.chain;
this.network = options.chain.network;
this.logger = options.chain.logger;
this.workers = options.chain.workers;
this.requireStandard = this.network.requireStandard;
this.minRelay = this.network.minRelay;
@ -1977,6 +1980,11 @@ MempoolOptions.prototype.fromOptions = function fromOptions(options) {
this.logger = options.logger;
}
if (options.workers != null) {
assert(typeof options.workers === 'object');
this.workers = options.workers;
}
if (options.fees != null) {
assert(typeof options.fees === 'object');
this.fees = options.fees;

View File

@ -11,7 +11,6 @@ const assert = require('assert');
const util = require('../utils/util');
const co = require('../utils/co');
const AsyncObject = require('../utils/asyncobject');
const workerPool = require('../workers/workerpool').pool;
const mine = require('./mine');
const Lock = require('../utils/lock');
@ -33,6 +32,7 @@ function CPUMiner(miner) {
this.miner = miner;
this.network = this.miner.network;
this.logger = this.miner.logger.context('cpuminer');
this.workers = this.miner.workers;
this.chain = this.miner.chain;
this.locker = new Lock();
@ -329,8 +329,11 @@ CPUMiner.prototype.findNonceAsync = async function findNonceAsync(job) {
let max = interval;
let nonce;
if (!this.workers)
return this.findNonce(job);
while (max <= 0xffffffff) {
nonce = await workerPool.mine(data, target, min, max);
nonce = await this.workers.mine(data, target, min, max);
if (nonce !== -1)
break;

View File

@ -36,6 +36,7 @@ function Miner(options) {
this.options = new MinerOptions(options);
this.network = this.options.network;
this.logger = this.options.logger.context('miner');
this.workers = null;
this.chain = this.options.chain;
this.mempool = this.options.mempool;
this.addresses = this.options.addresses;
@ -378,6 +379,7 @@ function MinerOptions(options) {
this.network = Network.primary;
this.logger = null;
this.workers = null;
this.chain = null;
this.mempool = null;
@ -412,12 +414,18 @@ MinerOptions.prototype.fromOptions = function fromOptions(options) {
this.chain = options.chain;
this.network = options.chain.network;
this.logger = options.chain.logger;
this.workers = options.chain.workers;
if (options.logger != null) {
assert(typeof options.logger === 'object');
this.logger = options.logger;
}
if (options.workers != null) {
assert(typeof options.workers === 'object');
this.workers = options.workers;
}
if (options.mempool != null) {
assert(typeof options.mempool === 'object');
this.mempool = options.mempool;

View File

@ -51,6 +51,7 @@ function FullNode(options) {
this.chain = new Chain({
network: this.network,
logger: this.logger,
workers: this.workers,
db: this.config.str('db'),
prefix: this.config.prefix,
maxFiles: this.config.num('max-files'),
@ -73,6 +74,7 @@ function FullNode(options) {
this.mempool = new Mempool({
network: this.network,
logger: this.logger,
workers: this.workers,
chain: this.chain,
fees: this.fees,
db: this.config.str('db'),
@ -120,6 +122,7 @@ function FullNode(options) {
this.miner = new Miner({
network: this.network,
logger: this.logger,
workers: this.workers,
chain: this.chain,
mempool: this.mempool,
address: this.config.array('coinbase-address'),

View File

@ -12,7 +12,7 @@ const AsyncObject = require('../utils/asyncobject');
const util = require('../utils/util');
const Network = require('../protocol/network');
const Logger = require('./logger');
const workerPool = require('../workers/workerpool').pool;
const WorkerPool = require('../workers/workerpool');
const secp256k1 = require('../crypto/secp256k1');
const native = require('../native');
const Config = require('./config');
@ -47,6 +47,7 @@ function Node(options) {
this.spv = false;
this.logger = null;
this.workers = null;
this.chain = null;
this.fees = null;
this.mempool = null;
@ -82,6 +83,12 @@ Node.prototype.initOptions = function initOptions() {
});
this.logger = logger.context('node');
this.workers = new WorkerPool({
enabled: config.str('workers-enabled'),
size: config.num('workers-size'),
timeout: config.num('workers-timeout')
});
};
/**
@ -95,6 +102,26 @@ Node.prototype.init = function init() {
this.on('error', () => {});
this.workers.on('spawn', (child) => {
this.logger.info('Spawning worker process: %d.', child.id);
});
this.workers.on('exit', (code, child) => {
this.logger.warning('Worker %d exited: %s.', child.id, code);
});
this.workers.on('log', (log) => {
this.logger.debug(log);
});
this.workers.on('error', (err, child) => {
if (child) {
this.logger.error('Worker %d error: %s', child.id, err.message);
return;
}
this.emit('error', err);
});
this.hook('preopen', () => {
this.handlePreopen();
});
@ -153,22 +180,6 @@ Node.prototype.handlePreopen = async function handlePreopen() {
this.logger.warning('Adjusted time mismatch!');
this.logger.warning('Please make sure your system clock is correct!');
});
this.bind(workerPool, 'spawn', (child) => {
this.logger.info('Spawning worker process: %d.', child.id);
});
this.bind(workerPool, 'exit', (code, child) => {
this.logger.warning('Worker %d exited: %s.', child.id, code);
});
this.bind(workerPool, 'error', (err, child) => {
if (child) {
this.logger.error('Worker %d error: %s', child.id, err.message);
return;
}
this.emit('error', err);
});
};
/**
@ -189,7 +200,7 @@ Node.prototype.handleOpen = async function handleOpen() {
this.logger.warning('Hashing will be slow.');
}
if (!workerPool.enabled) {
if (!this.workers.enabled) {
this.logger.warning('Warning: worker pool is disabled.');
this.logger.warning('Verification will be slow.');
}
@ -209,12 +220,11 @@ Node.prototype.handlePreclose = async function handlePreclose() {
*/
Node.prototype.handleClose = async function handleClose() {
this.startTime = -1;
for (let bound of this.bound)
bound[0].removeListener(bound[1], bound[2]);
for (let [obj, event, listener] of this.bound)
obj.removeListener(event, listener);
this.bound.length = 0;
this.startTime = -1;
await this.logger.close();
};
@ -239,25 +249,7 @@ Node.prototype.bind = function bind(obj, event, listener) {
*/
Node.prototype.error = function error(err) {
if (!err)
return;
if (err.type === 'VerifyError') {
switch (err.reason) {
case 'insufficient priority':
case 'non-final':
this.logger.spam(err.message);
break;
default:
this.logger.error(err.message);
break;
}
} else if (typeof err.code === 'string' && err.code[0] === 'E') {
this.logger.error(err.message);
} else {
this.logger.error(err);
}
this.logger.error(err);
this.emit('error', err);
};

View File

@ -17,7 +17,6 @@ const Coin = require('./coin');
const Outpoint = require('./outpoint');
const CoinView = require('../coins/coinview');
const Address = require('./address');
const workerPool = require('../workers/workerpool').pool;
const encoding = require('../utils/encoding');
const consensus = require('../protocol/consensus');
const policy = require('../protocol/policy');
@ -257,11 +256,12 @@ MTX.prototype.verify = function verify(flags) {
* Verify the transaction inputs on the worker pool
* (if workers are enabled).
* @param {VerifyFlags?} [flags=STANDARD_VERIFY_FLAGS]
* @param {WorkerPool?} pool
* @returns {Promise}
*/
MTX.prototype.verifyAsync = function verifyAsync(flags) {
return TX.prototype.verifyAsync.call(this, this.view, flags);
MTX.prototype.verifyAsync = function verifyAsync(flags, pool) {
return TX.prototype.verifyAsync.call(this, this.view, flags, pool);
};
/**
@ -561,11 +561,15 @@ MTX.prototype.scriptVector = function scriptVector(prev, vector, ring) {
* @param {Coin|Output} coin
* @param {KeyRing} ring
* @param {SighashType?} type
* @param {WorkerPool?} pool
* @returns {Promise}
*/
MTX.prototype.signInputAsync = function signInputAsync(index, coin, ring, type) {
return workerPool.signInput(this, index, coin, ring, type);
MTX.prototype.signInputAsync = function signInputAsync(index, coin, ring, type, pool) {
if (!pool)
return this.signInput(index, coin, ring, type);
return pool.signInput(this, index, coin, ring, type, pool);
};
/**
@ -974,11 +978,15 @@ MTX.prototype.sign = function sign(ring, type) {
* (if workers are enabled).
* @param {KeyRing} ring
* @param {SighashType?} type
* @param {WorkerPool?} pool
* @returns {Promise}
*/
MTX.prototype.signAsync = function signAsync(ring, type) {
return workerPool.sign(this, ring, type);
MTX.prototype.signAsync = function signAsync(ring, type, pool) {
if (!pool)
return this.sign(ring, type);
return pool.sign(this, ring, type);
};
/**

View File

@ -21,7 +21,6 @@ const Input = require('./input');
const Output = require('./output');
const Outpoint = require('./outpoint');
const InvItem = require('./invitem');
const workerPool = require('../workers/workerpool').pool;
const Bloom = require('../utils/bloom');
const consensus = require('../protocol/consensus');
const policy = require('../protocol/policy');
@ -845,17 +844,21 @@ TX.prototype.checkInput = function checkInput(index, coin, flags) {
* (if workers are enabled).
* @param {CoinView} view
* @param {VerifyFlags?} [flags=STANDARD_VERIFY_FLAGS]
* @param {WorkerPool?} pool
* @returns {Promise}
*/
TX.prototype.verifyAsync = async function verifyAsync(view, flags) {
TX.prototype.verifyAsync = async function verifyAsync(view, flags, pool) {
if (this.inputs.length === 0)
return false;
if (this.isCoinbase())
return true;
return await workerPool.verify(this, view, flags);
if (!pool)
return this.verify(view, flags);
return await pool.verify(this, view, flags);
};
/**
@ -864,13 +867,19 @@ TX.prototype.verifyAsync = async function verifyAsync(view, flags) {
* verified.
* @param {Coin|Output} coin - Previous output.
* @param {VerifyFlags} [flags=STANDARD_VERIFY_FLAGS]
* @param {WorkerPool?} pool
* @returns {Promise}
*/
TX.prototype.verifyInputAsync = async function verifyInputAsync(index, coin, flags) {
TX.prototype.verifyInputAsync = async function verifyInputAsync(index, coin, flags, pool) {
let input = this.inputs[index];
assert(input, 'Input does not exist.');
return await workerPool.verifyInput(this, index, coin, flags);
if (!pool)
return this.verifyInput(index, coin, flags);
return await pool.verifyInput(this, index, coin, flags);
};
/**

View File

@ -150,4 +150,4 @@ SigCacheEntry.prototype.equal = function equal(sig, key) {
* Expose
*/
module.exports = new SigCache(+process.env.BCOIN_SIGCACHE_SIZE || 0);
module.exports = SigCache;

View File

@ -36,6 +36,7 @@ plugin.init = function init(node) {
wdb = new WalletDB({
network: node.network,
logger: node.logger,
workers: node.workers,
client: client,
prefix: config.prefix,
db: config.str(['wallet-db', 'db']),

View File

@ -26,7 +26,7 @@ const server = exports;
server.create = function create(options) {
let config = new Config('bcoin');
let logger = new Logger('debug');
let client, wdb;
let client, wdb, workers;
config.inject(options);
config.load(options);
@ -52,9 +52,36 @@ server.create = function create(options) {
shrink: config.bool('log-shrink')
});
workers = new WorkerPool({
enabled: config.str('workers-enabled'),
size: config.num('workers-size'),
timeout: config.num('workers-timeout')
});
workers.on('spawn', (child) => {
logger.info('Spawning worker process: %d.', child.id);
});
workers.on('exit', (code, child) => {
logger.warning('Worker %d exited: %s.', child.id, code);
});
workers.on('log', (log) => {
logger.debug(log);
});
workers.on('error', (err, child) => {
if (child) {
logger.error('Worker %d error: %s', child.id, err.message);
return;
}
wdb.emit('error', err);
});
wdb = new WalletDB({
network: config.network,
logger: logger,
workers: workers,
client: client,
prefix: config.prefix,
db: config.str('db'),

View File

@ -24,6 +24,7 @@ const Path = require('./path');
const common = require('./common');
const Address = require('../primitives/address');
const MTX = require('../primitives/mtx');
const Script = require('../script/script');
const WalletKey = require('./walletkey');
const HD = require('../hd/hd');
const Output = require('../primitives/output');
@ -2071,7 +2072,7 @@ Wallet.prototype.sign = async function sign(mtx, passphrase) {
rings = await this.deriveInputs(mtx);
return await mtx.signAsync(rings);
return await mtx.signAsync(rings, Script.hashType.ALL, this.db.workers);
};
/**

View File

@ -17,5 +17,4 @@ exports.packets = require('./packets');
exports.ParserClient = require('./parser-client');
exports.Parser = require('./parser');
// exports.worker = require('./worker');
exports.Worker = require('./workerpool').Worker;
exports.WorkerPool = require('./workerpool').WorkerPool;
exports.WorkerPool = require('./workerpool');

View File

@ -49,8 +49,6 @@ function WorkerPool(options) {
this.enabled = true;
this.set(options);
this.on('error', () => {});
}
util.inherits(WorkerPool, EventEmitter);
@ -660,8 +658,8 @@ Worker.prototype.handlePacket = function handlePacket(packet) {
this.emit('event', packet.items);
break;
case packets.types.LOG:
util.log('Worker %d:', this.id);
util.log(packet.text);
this.emit('log', 'Worker ' + this.id);
this.emit('log', packet.text);
break;
case packets.types.ERROR:
this.emit('error', packet.error);
@ -897,33 +895,8 @@ function getCores() {
return Math.max(2, os.cpus().length);
}
/*
* Default Pool
*/
exports.pool = new WorkerPool();
exports.set = function set(options) {
this.pool.set(options);
};
exports.enable = function enable() {
this.pool.enable();
};
exports.disable = function disable() {
this.pool.disable();
};
exports.set({
enabled: +process.env.BCOIN_WORKERS_ENABLED !== 0,
size: +process.env.BCOIN_WORKERS_SIZE || null,
timeout: +process.env.BCOIN_WORKERS_TIMEOUT || null
});
/*
* Expose
*/
exports.WorkerPool = WorkerPool;
exports.Worker = Worker;
module.exports = WorkerPool;