From f158a73a2d187f8ac7cbb0aeaea1ceae2d6d1c15 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Thu, 22 Sep 2016 02:49:18 -0700 Subject: [PATCH] refactor: fixes. workers. locks. --- bin/cli | 1 + lib/chain/chain.js | 1 - lib/chain/chaindb.js | 8 ++-- lib/db/lowlevelup.js | 6 +-- lib/http/rpc.js | 9 ++--- lib/primitives/mtx.js | 24 ++++++++++++ lib/primitives/tx.js | 30 +++++++++++++++ lib/utils/locker.js | 83 ++++++++++++++++++++++++++---------------- lib/utils/spawn.js | 27 +++++++++++++- lib/wallet/txdb.js | 3 +- lib/wallet/wallet.js | 4 +- lib/workers/jobs.js | 69 ++++++++++++++++++++++++++++++++--- lib/workers/workers.js | 65 +++++++++++++++++++++++++++++++-- 13 files changed, 272 insertions(+), 58 deletions(-) diff --git a/bin/cli b/bin/cli index e7ef18cd..240e7ea4 100755 --- a/bin/cli +++ b/bin/cli @@ -7,6 +7,7 @@ var utils = require('../lib/utils/utils'); var spawn = require('../lib/utils/spawn'); var Client = require('../lib/http/client'); var Wallet = require('../lib/http/wallet'); +var co = spawn.co; var assert = utils.assert; var main; diff --git a/lib/chain/chain.js b/lib/chain/chain.js index 0300ce39..9fdcfe36 100644 --- a/lib/chain/chain.js +++ b/lib/chain/chain.js @@ -894,7 +894,6 @@ Chain.prototype.setBestChain = co(function* setBestChain(entry, block, prev) { } // Otherwise, everything is in order. - // Do "contextual" verification on our block // now that we're certain its previous // block is in the chain. diff --git a/lib/chain/chaindb.js b/lib/chain/chaindb.js index ffa4684d..bd6bccab 100644 --- a/lib/chain/chaindb.js +++ b/lib/chain/chaindb.js @@ -1069,7 +1069,7 @@ ChainDB.prototype.fillCoins = co(function* fillCoins(tx) { */ ChainDB.prototype.fillHistory = co(function* fillHistory(tx) { - var i, input, tx; + var i, input, ptx; if (!this.options.indexTX) return tx; @@ -1083,10 +1083,10 @@ ChainDB.prototype.fillHistory = co(function* fillHistory(tx) { if (input.coin) continue; - tx = yield this.getTX(input.prevout.hash); + ptx = yield this.getTX(input.prevout.hash); - if (tx) - input.coin = bcoin.coin.fromTX(tx, input.prevout.index); + if (ptx) + input.coin = bcoin.coin.fromTX(ptx, input.prevout.index); } return tx; diff --git a/lib/db/lowlevelup.js b/lib/db/lowlevelup.js index 2b9c5b72..569a4683 100644 --- a/lib/db/lowlevelup.js +++ b/lib/db/lowlevelup.js @@ -97,7 +97,7 @@ LowlevelUp.prototype.destroy = function destroy() { return new Promise(function(resolve, reject) { if (!self.backend.destroy) - return utils.asyncify(reject)(new Error('Cannot destroy.')); + return reject(new Error('Cannot destroy.')); self.backend.destroy(self.location, wrap(resolve, reject)); }); }; @@ -116,7 +116,7 @@ LowlevelUp.prototype.repair = function repair() { return new Promise(function(resolve, reject) { if (!self.backend.repair) - return utils.asyncify(reject)(new Error('Cannot repair.')); + return reject(new Error('Cannot repair.')); self.backend.repair(self.location, wrap(resolve, reject)); }); }; @@ -279,7 +279,7 @@ LowlevelUp.prototype.approximateSize = function approximateSize(start, end) { return new Promise(function(resolve, reject) { if (!self.binding.approximateSize) - return utils.asyncify(reject)(new Error('Cannot get size.')); + return reject(new Error('Cannot get size.')); self.binding.approximateSize(start, end, wrap(resolve, reject)); }); diff --git a/lib/http/rpc.js b/lib/http/rpc.js index 85096441..fb21702d 100644 --- a/lib/http/rpc.js +++ b/lib/http/rpc.js @@ -1683,11 +1683,10 @@ RPC.prototype._tmpl = co(function* _tmpl(version, coinbase, rules) { }); RPC.prototype._poll = co(function* _poll(lpid) { - var self = this; var watched, lastTX; if (typeof lpid !== 'string') - return null; + return; if (lpid.length !== 74) throw new RPCError('Invalid parameter.'); @@ -1701,7 +1700,7 @@ RPC.prototype._poll = co(function* _poll(lpid) { watched = utils.revHex(watched); if (this.chain.tip.hash !== watched) - return null; + return; yield this._onBlock(); }); @@ -2386,10 +2385,10 @@ RPC.prototype._createRedeem = co(function* _createRedeem(args) { }); /* - * Utility co(function* s + * Utility Functions */ -RPC.prototype.createmultisig = function createmultisig(args) { +RPC.prototype.createmultisig = co(function* createmultisig(args) { var script; if (args.help || args.length < 2 || args.length > 2) diff --git a/lib/primitives/mtx.js b/lib/primitives/mtx.js index 202c2ea6..b1206558 100644 --- a/lib/primitives/mtx.js +++ b/lib/primitives/mtx.js @@ -385,6 +385,30 @@ MTX.prototype.scriptVector = function scriptVector(prev, vector, ring) { return false; }; +/** + * Sign a transaction input on the worker pool + * (if workers are enabled). + * @param {Number} index + * @param {Buffer} key + * @param {SighashType?} type + * @param {Function} callback + */ + +MTX.prototype.signInputAsync = function signInputAsync(index, key, type) { + var result; + + if (!bcoin.useWorkers) { + try { + result = this.signInput(index, key, type); + } catch (e) { + return Promise.reject(e); + } + return Promise.resolve(result); + } + + return bcoin.workerPool.signInput(this, index, key, type); +}; + /** * Sign an input. * @param {Number} index - Index of input being signed. diff --git a/lib/primitives/tx.js b/lib/primitives/tx.js index f67d5472..8a24061c 100644 --- a/lib/primitives/tx.js +++ b/lib/primitives/tx.js @@ -734,6 +734,36 @@ TX.prototype.verifyAsync = function verifyAsync(flags) { return bcoin.workerPool.verify(this, flags); }; +/** + * Verify a transaction input asynchronously. + * @param {Number} index - Index of output being + * verified. + * @param {VerifyFlags} [flags=STANDARD_VERIFY_FLAGS] + * @returns {Boolean} Whether the input is valid. + */ + +TX.prototype.verifyInputAsync = function verifyInputAsync(index, flags) { + var input, result; + + if (!bcoin.useWorkers) { + try { + result = this.verifyInput(index, flags); + } catch (e) { + return Promise.reject(e); + } + return Promise.resolve(result); + } + + if (typeof index === 'object') + index = this.inputs.indexOf(index); + + input = this.inputs[index]; + + assert(input, 'Input does not exist.'); + + return bcoin.workerPool.verifyInput(this, index, flags); +}; + /** * Test whether the transaction is a coinbase * by examining the inputs. diff --git a/lib/utils/locker.js b/lib/utils/locker.js index d8491913..6fc6fa7c 100644 --- a/lib/utils/locker.js +++ b/lib/utils/locker.js @@ -32,6 +32,7 @@ function Locker(parent, add) { this.pending = []; this.pendingMap = {}; this.add = add; + this._unlock = this.unlock.bind(this); this._unlocker = this.unlocker.bind(this); } @@ -74,10 +75,10 @@ Locker.prototype.hasPending = function hasPending(key) { Locker.prototype.lock = function lock(arg1, arg2) { var self = this; - var force, obj; + var force, object; if (this.add) { - obj = arg1; + object = arg1; force = arg2; } else { force = arg1; @@ -85,16 +86,16 @@ Locker.prototype.lock = function lock(arg1, arg2) { if (force) { assert(this.busy); - return new Promise(this._force); + return new Promise(nop); } if (this.busy) { return new Promise(function(resolve, reject) { - if (obj) { - self.pending.push(obj); - self.pendingMap[obj.hash('hex')] = true; + if (object) { + self.pending.push(object); + self.pendingMap[object.hash('hex')] = true; } - self.jobs.push([resolve, obj]); + self.jobs.push([resolve, object]); }); } @@ -103,16 +104,23 @@ Locker.prototype.lock = function lock(arg1, arg2) { return new Promise(this._unlock); }; -Locker.prototype._force = function force(resolve, reject) { - resolve(utils.nop); -}; +/** + * Unlock callback to resolve a promise. + * @param {Function} resolve + * @param {Function} reject + */ Locker.prototype.unlock = function unlock(resolve, reject) { resolve(this._unlocker); }; +/** + * The actual unlock callback. + * @private + */ + Locker.prototype.unlocker = function unlocker() { - var item, resolve, obj; + var item, resolve, object; this.busy = false; @@ -124,14 +132,15 @@ Locker.prototype.unlocker = function unlocker() { item = this.jobs.shift(); resolve = item[0]; - obj = item[1]; + object = item[1]; - if (obj) { - assert(obj === this.pending.shift()); - delete this.pendingMap[obj.hash('hex')]; + if (object) { + assert(object === this.pending.shift()); + delete this.pendingMap[object.hash('hex')]; } this.busy = true; + resolve(this._unlocker); }; @@ -140,10 +149,8 @@ Locker.prototype.unlocker = function unlocker() { */ Locker.prototype.destroy = function destroy() { - if (this.add) { - this.pending.length = 0; - this.pendingMap = {}; - } + this.pending.length = 0; + this.pendingMap = {}; this.jobs.length = 0; }; @@ -178,7 +185,7 @@ function MappedLock(parent) { return MappedLock.create(parent); this.parent = parent; - this.jobs = []; + this.jobs = {}; this.busy = {}; } @@ -213,21 +220,21 @@ MappedLock.prototype.lock = function lock(key, force) { if (force || key == null) { assert(key == null || this.busy[key]); - return new Promise(function(resolve, reject) { - resolve(function unlock() {}); - }); + return new Promise(nop); } if (this.busy[key]) { return new Promise(function(resolve, reject) { - self.jobs.push([resolve, key]); + if (!self.jobs[key]) + self.jobs[key] = []; + self.jobs[key].push(resolve); }); } this.busy[key] = true; return new Promise(function(resolve, reject) { - resolve(self._unlock(key)); + resolve(self.unlock(key)); }); }; @@ -238,23 +245,37 @@ MappedLock.prototype.lock = function lock(key, force) { * @returns {Function} Unlocker. */ -MappedLock.prototype._unlock = function _unlock(key) { +MappedLock.prototype.unlock = function unlock(key) { var self = this; return function unlock() { - var item; + var jobs = self.jobs[key]; + var resolve; delete self.busy[key]; - if (self.jobs.length === 0) + if (!jobs) return; - item = self.jobs.shift(); + resolve = jobs.shift(); + assert(resolve); - self.busy = true; - item[0](self._unlock(item[1])); + if (jobs.length === 0) + delete self.jobs[key]; + + self.busy[key] = true; + + resolve(unlock); }; }; +/* + * Helpers + */ + +function nop(resolve, reject) { + resolve(utils.nop); +} + /* * Expose */ diff --git a/lib/utils/spawn.js b/lib/utils/spawn.js index 0a1d23ba..80afc313 100644 --- a/lib/utils/spawn.js +++ b/lib/utils/spawn.js @@ -77,6 +77,30 @@ function co(generator) { }; } +/** + * Wrap a generator function to be + * executed into a function that + * returns a promise. + * @param {GeneratorFunction} + * @returns {Function} + */ + +function col(name, generator) { + var func = co(generator); + return co(function *() { + var unlock = yield this[name].lock(); + var result; + try { + result = yield func.apply(this, arguments); + } catch (e) { + unlock(); + throw e; + } + unlock(); + return result; + }); +} + /** * Wrap a generator function to be * executed into a function that @@ -222,7 +246,7 @@ function call(func) { return reject(err); resolve(result); }); - func.apply(self, args); + func.apply(null, args); }); } @@ -262,6 +286,7 @@ exports = spawn; exports.exec = exec; exports.spawn = spawn; exports.co = co; +exports.col = col; exports.cob = cob; exports.con = con; exports.cb = cb; diff --git a/lib/wallet/txdb.js b/lib/wallet/txdb.js index 1a437f37..38b793dd 100644 --- a/lib/wallet/txdb.js +++ b/lib/wallet/txdb.js @@ -570,7 +570,7 @@ TXDB.prototype._resolveOrphans = co(function* _resolveOrphans(tx, index) { // Verify that input script is correct, if not - add // output to unspent and remove orphan from storage - if (!this.options.verify || orphan.verifyInput(input.index)) { + if (!this.options.verify || (yield orphan.verifyInputAsync(input.index))) { this.put(layout.d(input.hash, input.index), coin.toRaw()); return true; } @@ -610,6 +610,7 @@ TXDB.prototype.add = co(function* add(tx, info) { return true; } + // Verify and get coins. result = yield this._verify(tx, info); if (!result) { diff --git a/lib/wallet/wallet.js b/lib/wallet/wallet.js index 934e5ce9..a417bdbb 100644 --- a/lib/wallet/wallet.js +++ b/lib/wallet/wallet.js @@ -734,7 +734,7 @@ Wallet.prototype.getPath = co(function* getPath(address) { Wallet.prototype.getPaths = co(function* getPaths(account) { var out = []; - var i, account, paths, path; + var i, paths, path; account = yield this._getIndex(account); paths = yield this.db.getWalletPaths(this.wid); @@ -761,7 +761,7 @@ Wallet.prototype.getPaths = co(function* getPaths(account) { Wallet.prototype.importKey = co(function* importKey(account, ring, passphrase) { var unlock = yield this._lockWrite(); - var exists, account, raw, path; + var exists, raw, path; if (account && typeof account === 'object') { passphrase = ring; diff --git a/lib/workers/jobs.js b/lib/workers/jobs.js index 0733d320..2141c529 100644 --- a/lib/workers/jobs.js +++ b/lib/workers/jobs.js @@ -29,11 +29,24 @@ jobs.verify = function verify(tx, flags) { }; /** - * Execute Wallet.sign() on worker. - * @see Wallet.sign - * @param {KeyRing[]} rings - * @param {HDPrivateKey} master + * Execute tx.verifyInput() on worker. + * @see TX#verifyInput + * @param {TX} tx + * @param {Number} index + * @param {VerifyFlags} flags + * @returns {Boolean} + */ + +jobs.verifyInput = function verifyInput(tx, index, flags) { + return tx.verifyInput(index, flags); +}; + +/** + * Execute tx.sign() on worker. + * @see MTX#sign * @param {MTX} tx + * @param {KeyRing[]} ring + * @param {SighashType} type */ jobs.sign = function sign(tx, ring, type) { @@ -49,6 +62,50 @@ jobs.sign = function sign(tx, ring, type) { return [sigs, total]; }; +/** + * Execute tx.signInput() on worker. + * @see MTX#signInput + * @param {MTX} tx + * @param {Number} index + * @param {Buffer} key + * @param {SighashType} type + */ + +jobs.signInput = function signInput(tx, index, key, type) { + var result = tx.signInput(tx, index, key, type); + var input = tx.inputs[index]; + + if (!result) + return null; + + return [input.script, input.witness]; +}; + +/** + * Execute ec.verify() on worker. + * @see ec.verify + * @param {TX} tx + * @param {VerifyFlags} flags + * @returns {Boolean} + */ + +jobs.ecVerify = function ecVerify(msg, sig, key) { + return bcoin.ec.verify(msg, sig, key); +}; + +/** + * Execute ec.sign() on worker. + * @see ec.sign + * @param {TX} tx + * @param {Number} index + * @param {VerifyFlags} flags + * @returns {Boolean} + */ + +jobs.ecSign = function ecSign(msg, key) { + return bcoin.ec.sign(msg, key); +}; + /** * Mine a block on worker. * @param {Object} attempt - Naked {@link MinerBlock}. @@ -56,8 +113,8 @@ jobs.sign = function sign(tx, ring, type) { */ jobs.mine = function mine(attempt) { - attempt.on('status', function(stat) { - bcoin.master.sendEvent('status', stat); + attempt.on('status', function(status) { + bcoin.master.sendEvent('status', status); }); return attempt.mineSync(); }; diff --git a/lib/workers/workers.js b/lib/workers/workers.js index 27ad3805..82e00d1d 100644 --- a/lib/workers/workers.js +++ b/lib/workers/workers.js @@ -240,11 +240,23 @@ Workers.prototype.verify = function verify(tx, flags) { return this.execute('verify', [tx, flags], -1); }; +/** + * Execute the tx input verification job (default timeout). + * @param {TX} tx + * @param {Number} index + * @param {VerifyFlags} flags + * @param {Function} callback - Returns [Error, Boolean]. + */ + +Workers.prototype.verifyInput = function verifyInput(tx, index, flags) { + return this.execute('verifyInput', [tx, index, flags], -1); +}; + /** * Execute the tx signing job (default timeout). - * @param {KeyRing[]} rings - * @param {HDPrivateKey} master * @param {MTX} tx + * @param {KeyRing[]} ring + * @param {SighashType} type * @param {Function} callback */ @@ -266,6 +278,51 @@ Workers.prototype.sign = co(function* sign(tx, ring, type) { return total; }); +/** + * Execute the tx input signing job (default timeout). + * @param {MTX} tx + * @param {Number} index + * @param {Buffer} key + * @param {SighashType} type + * @param {Function} callback + */ + +Workers.prototype.signInput = co(function* signInput(tx, index, key, type) { + var sig = yield this.execute('signInput', [tx, index, key, type], -1); + var input = tx.inputs[index]; + + if (!sig) + return false; + + input.script = sig[0]; + input.witness = sig[1]; + + return true; +}); + +/** + * Execute the ec verify job (no timeout). + * @param {Buffer} msg + * @param {Buffer} sig - DER formatted. + * @param {Buffer} key + * @param {Function} callback + */ + +Workers.prototype.ecVerify = function ecVerify(msg, sig, key) { + return this.execute('ecVerify', [msg, sig, key], -1); +}; + +/** + * Execute the ec signing job (no timeout). + * @param {Buffer} msg + * @param {Buffer} key + * @param {Function} callback + */ + +Workers.prototype.ecSign = function ecSign(msg, key) { + return this.execute('ecSign', [msg, key], -1); +}; + /** * Execute the mining job (no timeout). * @param {MinerBlock} attempt @@ -273,7 +330,7 @@ Workers.prototype.sign = co(function* sign(tx, ring, type) { */ Workers.prototype.mine = function mine(attempt) { - this.execute('mine', [attempt], -1); + return this.execute('mine', [attempt], -1); }; /** @@ -289,7 +346,7 @@ Workers.prototype.mine = function mine(attempt) { */ Workers.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) { - this.execute('scrypt', [passwd, salt, N, r, p, len], -1); + return this.execute('scrypt', [passwd, salt, N, r, p, len], -1); }; /**