refactor: fixes. workers. locks.

This commit is contained in:
Christopher Jeffrey 2016-09-22 02:49:18 -07:00
parent 8c11a2aa3f
commit f158a73a2d
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
13 changed files with 272 additions and 58 deletions

View File

@ -7,6 +7,7 @@ var utils = require('../lib/utils/utils');
var spawn = require('../lib/utils/spawn');
var Client = require('../lib/http/client');
var Wallet = require('../lib/http/wallet');
var co = spawn.co;
var assert = utils.assert;
var main;

View File

@ -894,7 +894,6 @@ Chain.prototype.setBestChain = co(function* setBestChain(entry, block, prev) {
}
// Otherwise, everything is in order.
// Do "contextual" verification on our block
// now that we're certain its previous
// block is in the chain.

View File

@ -1069,7 +1069,7 @@ ChainDB.prototype.fillCoins = co(function* fillCoins(tx) {
*/
ChainDB.prototype.fillHistory = co(function* fillHistory(tx) {
var i, input, tx;
var i, input, ptx;
if (!this.options.indexTX)
return tx;
@ -1083,10 +1083,10 @@ ChainDB.prototype.fillHistory = co(function* fillHistory(tx) {
if (input.coin)
continue;
tx = yield this.getTX(input.prevout.hash);
ptx = yield this.getTX(input.prevout.hash);
if (tx)
input.coin = bcoin.coin.fromTX(tx, input.prevout.index);
if (ptx)
input.coin = bcoin.coin.fromTX(ptx, input.prevout.index);
}
return tx;

View File

@ -97,7 +97,7 @@ LowlevelUp.prototype.destroy = function destroy() {
return new Promise(function(resolve, reject) {
if (!self.backend.destroy)
return utils.asyncify(reject)(new Error('Cannot destroy.'));
return reject(new Error('Cannot destroy.'));
self.backend.destroy(self.location, wrap(resolve, reject));
});
};
@ -116,7 +116,7 @@ LowlevelUp.prototype.repair = function repair() {
return new Promise(function(resolve, reject) {
if (!self.backend.repair)
return utils.asyncify(reject)(new Error('Cannot repair.'));
return reject(new Error('Cannot repair.'));
self.backend.repair(self.location, wrap(resolve, reject));
});
};
@ -279,7 +279,7 @@ LowlevelUp.prototype.approximateSize = function approximateSize(start, end) {
return new Promise(function(resolve, reject) {
if (!self.binding.approximateSize)
return utils.asyncify(reject)(new Error('Cannot get size.'));
return reject(new Error('Cannot get size.'));
self.binding.approximateSize(start, end, wrap(resolve, reject));
});

View File

@ -1683,11 +1683,10 @@ RPC.prototype._tmpl = co(function* _tmpl(version, coinbase, rules) {
});
RPC.prototype._poll = co(function* _poll(lpid) {
var self = this;
var watched, lastTX;
if (typeof lpid !== 'string')
return null;
return;
if (lpid.length !== 74)
throw new RPCError('Invalid parameter.');
@ -1701,7 +1700,7 @@ RPC.prototype._poll = co(function* _poll(lpid) {
watched = utils.revHex(watched);
if (this.chain.tip.hash !== watched)
return null;
return;
yield this._onBlock();
});
@ -2386,10 +2385,10 @@ RPC.prototype._createRedeem = co(function* _createRedeem(args) {
});
/*
* Utility co(function* s
* Utility Functions
*/
RPC.prototype.createmultisig = function createmultisig(args) {
RPC.prototype.createmultisig = co(function* createmultisig(args) {
var script;
if (args.help || args.length < 2 || args.length > 2)

View File

@ -385,6 +385,30 @@ MTX.prototype.scriptVector = function scriptVector(prev, vector, ring) {
return false;
};
/**
* Sign a transaction input on the worker pool
* (if workers are enabled).
* @param {Number} index
* @param {Buffer} key
* @param {SighashType?} type
* @param {Function} callback
*/
MTX.prototype.signInputAsync = function signInputAsync(index, key, type) {
var result;
if (!bcoin.useWorkers) {
try {
result = this.signInput(index, key, type);
} catch (e) {
return Promise.reject(e);
}
return Promise.resolve(result);
}
return bcoin.workerPool.signInput(this, index, key, type);
};
/**
* Sign an input.
* @param {Number} index - Index of input being signed.

View File

@ -734,6 +734,36 @@ TX.prototype.verifyAsync = function verifyAsync(flags) {
return bcoin.workerPool.verify(this, flags);
};
/**
* Verify a transaction input asynchronously.
* @param {Number} index - Index of output being
* verified.
* @param {VerifyFlags} [flags=STANDARD_VERIFY_FLAGS]
* @returns {Boolean} Whether the input is valid.
*/
TX.prototype.verifyInputAsync = function verifyInputAsync(index, flags) {
var input, result;
if (!bcoin.useWorkers) {
try {
result = this.verifyInput(index, flags);
} catch (e) {
return Promise.reject(e);
}
return Promise.resolve(result);
}
if (typeof index === 'object')
index = this.inputs.indexOf(index);
input = this.inputs[index];
assert(input, 'Input does not exist.');
return bcoin.workerPool.verifyInput(this, index, flags);
};
/**
* Test whether the transaction is a coinbase
* by examining the inputs.

View File

@ -32,6 +32,7 @@ function Locker(parent, add) {
this.pending = [];
this.pendingMap = {};
this.add = add;
this._unlock = this.unlock.bind(this);
this._unlocker = this.unlocker.bind(this);
}
@ -74,10 +75,10 @@ Locker.prototype.hasPending = function hasPending(key) {
Locker.prototype.lock = function lock(arg1, arg2) {
var self = this;
var force, obj;
var force, object;
if (this.add) {
obj = arg1;
object = arg1;
force = arg2;
} else {
force = arg1;
@ -85,16 +86,16 @@ Locker.prototype.lock = function lock(arg1, arg2) {
if (force) {
assert(this.busy);
return new Promise(this._force);
return new Promise(nop);
}
if (this.busy) {
return new Promise(function(resolve, reject) {
if (obj) {
self.pending.push(obj);
self.pendingMap[obj.hash('hex')] = true;
if (object) {
self.pending.push(object);
self.pendingMap[object.hash('hex')] = true;
}
self.jobs.push([resolve, obj]);
self.jobs.push([resolve, object]);
});
}
@ -103,16 +104,23 @@ Locker.prototype.lock = function lock(arg1, arg2) {
return new Promise(this._unlock);
};
Locker.prototype._force = function force(resolve, reject) {
resolve(utils.nop);
};
/**
* Unlock callback to resolve a promise.
* @param {Function} resolve
* @param {Function} reject
*/
Locker.prototype.unlock = function unlock(resolve, reject) {
resolve(this._unlocker);
};
/**
* The actual unlock callback.
* @private
*/
Locker.prototype.unlocker = function unlocker() {
var item, resolve, obj;
var item, resolve, object;
this.busy = false;
@ -124,14 +132,15 @@ Locker.prototype.unlocker = function unlocker() {
item = this.jobs.shift();
resolve = item[0];
obj = item[1];
object = item[1];
if (obj) {
assert(obj === this.pending.shift());
delete this.pendingMap[obj.hash('hex')];
if (object) {
assert(object === this.pending.shift());
delete this.pendingMap[object.hash('hex')];
}
this.busy = true;
resolve(this._unlocker);
};
@ -140,10 +149,8 @@ Locker.prototype.unlocker = function unlocker() {
*/
Locker.prototype.destroy = function destroy() {
if (this.add) {
this.pending.length = 0;
this.pendingMap = {};
}
this.pending.length = 0;
this.pendingMap = {};
this.jobs.length = 0;
};
@ -178,7 +185,7 @@ function MappedLock(parent) {
return MappedLock.create(parent);
this.parent = parent;
this.jobs = [];
this.jobs = {};
this.busy = {};
}
@ -213,21 +220,21 @@ MappedLock.prototype.lock = function lock(key, force) {
if (force || key == null) {
assert(key == null || this.busy[key]);
return new Promise(function(resolve, reject) {
resolve(function unlock() {});
});
return new Promise(nop);
}
if (this.busy[key]) {
return new Promise(function(resolve, reject) {
self.jobs.push([resolve, key]);
if (!self.jobs[key])
self.jobs[key] = [];
self.jobs[key].push(resolve);
});
}
this.busy[key] = true;
return new Promise(function(resolve, reject) {
resolve(self._unlock(key));
resolve(self.unlock(key));
});
};
@ -238,23 +245,37 @@ MappedLock.prototype.lock = function lock(key, force) {
* @returns {Function} Unlocker.
*/
MappedLock.prototype._unlock = function _unlock(key) {
MappedLock.prototype.unlock = function unlock(key) {
var self = this;
return function unlock() {
var item;
var jobs = self.jobs[key];
var resolve;
delete self.busy[key];
if (self.jobs.length === 0)
if (!jobs)
return;
item = self.jobs.shift();
resolve = jobs.shift();
assert(resolve);
self.busy = true;
item[0](self._unlock(item[1]));
if (jobs.length === 0)
delete self.jobs[key];
self.busy[key] = true;
resolve(unlock);
};
};
/*
* Helpers
*/
function nop(resolve, reject) {
resolve(utils.nop);
}
/*
* Expose
*/

View File

@ -77,6 +77,30 @@ function co(generator) {
};
}
/**
* Wrap a generator function to be
* executed into a function that
* returns a promise.
* @param {GeneratorFunction}
* @returns {Function}
*/
function col(name, generator) {
var func = co(generator);
return co(function *() {
var unlock = yield this[name].lock();
var result;
try {
result = yield func.apply(this, arguments);
} catch (e) {
unlock();
throw e;
}
unlock();
return result;
});
}
/**
* Wrap a generator function to be
* executed into a function that
@ -222,7 +246,7 @@ function call(func) {
return reject(err);
resolve(result);
});
func.apply(self, args);
func.apply(null, args);
});
}
@ -262,6 +286,7 @@ exports = spawn;
exports.exec = exec;
exports.spawn = spawn;
exports.co = co;
exports.col = col;
exports.cob = cob;
exports.con = con;
exports.cb = cb;

View File

@ -570,7 +570,7 @@ TXDB.prototype._resolveOrphans = co(function* _resolveOrphans(tx, index) {
// Verify that input script is correct, if not - add
// output to unspent and remove orphan from storage
if (!this.options.verify || orphan.verifyInput(input.index)) {
if (!this.options.verify || (yield orphan.verifyInputAsync(input.index))) {
this.put(layout.d(input.hash, input.index), coin.toRaw());
return true;
}
@ -610,6 +610,7 @@ TXDB.prototype.add = co(function* add(tx, info) {
return true;
}
// Verify and get coins.
result = yield this._verify(tx, info);
if (!result) {

View File

@ -734,7 +734,7 @@ Wallet.prototype.getPath = co(function* getPath(address) {
Wallet.prototype.getPaths = co(function* getPaths(account) {
var out = [];
var i, account, paths, path;
var i, paths, path;
account = yield this._getIndex(account);
paths = yield this.db.getWalletPaths(this.wid);
@ -761,7 +761,7 @@ Wallet.prototype.getPaths = co(function* getPaths(account) {
Wallet.prototype.importKey = co(function* importKey(account, ring, passphrase) {
var unlock = yield this._lockWrite();
var exists, account, raw, path;
var exists, raw, path;
if (account && typeof account === 'object') {
passphrase = ring;

View File

@ -29,11 +29,24 @@ jobs.verify = function verify(tx, flags) {
};
/**
* Execute Wallet.sign() on worker.
* @see Wallet.sign
* @param {KeyRing[]} rings
* @param {HDPrivateKey} master
* Execute tx.verifyInput() on worker.
* @see TX#verifyInput
* @param {TX} tx
* @param {Number} index
* @param {VerifyFlags} flags
* @returns {Boolean}
*/
jobs.verifyInput = function verifyInput(tx, index, flags) {
return tx.verifyInput(index, flags);
};
/**
* Execute tx.sign() on worker.
* @see MTX#sign
* @param {MTX} tx
* @param {KeyRing[]} ring
* @param {SighashType} type
*/
jobs.sign = function sign(tx, ring, type) {
@ -49,6 +62,50 @@ jobs.sign = function sign(tx, ring, type) {
return [sigs, total];
};
/**
* Execute tx.signInput() on worker.
* @see MTX#signInput
* @param {MTX} tx
* @param {Number} index
* @param {Buffer} key
* @param {SighashType} type
*/
jobs.signInput = function signInput(tx, index, key, type) {
var result = tx.signInput(tx, index, key, type);
var input = tx.inputs[index];
if (!result)
return null;
return [input.script, input.witness];
};
/**
* Execute ec.verify() on worker.
* @see ec.verify
* @param {TX} tx
* @param {VerifyFlags} flags
* @returns {Boolean}
*/
jobs.ecVerify = function ecVerify(msg, sig, key) {
return bcoin.ec.verify(msg, sig, key);
};
/**
* Execute ec.sign() on worker.
* @see ec.sign
* @param {TX} tx
* @param {Number} index
* @param {VerifyFlags} flags
* @returns {Boolean}
*/
jobs.ecSign = function ecSign(msg, key) {
return bcoin.ec.sign(msg, key);
};
/**
* Mine a block on worker.
* @param {Object} attempt - Naked {@link MinerBlock}.
@ -56,8 +113,8 @@ jobs.sign = function sign(tx, ring, type) {
*/
jobs.mine = function mine(attempt) {
attempt.on('status', function(stat) {
bcoin.master.sendEvent('status', stat);
attempt.on('status', function(status) {
bcoin.master.sendEvent('status', status);
});
return attempt.mineSync();
};

View File

@ -240,11 +240,23 @@ Workers.prototype.verify = function verify(tx, flags) {
return this.execute('verify', [tx, flags], -1);
};
/**
* Execute the tx input verification job (default timeout).
* @param {TX} tx
* @param {Number} index
* @param {VerifyFlags} flags
* @param {Function} callback - Returns [Error, Boolean].
*/
Workers.prototype.verifyInput = function verifyInput(tx, index, flags) {
return this.execute('verifyInput', [tx, index, flags], -1);
};
/**
* Execute the tx signing job (default timeout).
* @param {KeyRing[]} rings
* @param {HDPrivateKey} master
* @param {MTX} tx
* @param {KeyRing[]} ring
* @param {SighashType} type
* @param {Function} callback
*/
@ -266,6 +278,51 @@ Workers.prototype.sign = co(function* sign(tx, ring, type) {
return total;
});
/**
* Execute the tx input signing job (default timeout).
* @param {MTX} tx
* @param {Number} index
* @param {Buffer} key
* @param {SighashType} type
* @param {Function} callback
*/
Workers.prototype.signInput = co(function* signInput(tx, index, key, type) {
var sig = yield this.execute('signInput', [tx, index, key, type], -1);
var input = tx.inputs[index];
if (!sig)
return false;
input.script = sig[0];
input.witness = sig[1];
return true;
});
/**
* Execute the ec verify job (no timeout).
* @param {Buffer} msg
* @param {Buffer} sig - DER formatted.
* @param {Buffer} key
* @param {Function} callback
*/
Workers.prototype.ecVerify = function ecVerify(msg, sig, key) {
return this.execute('ecVerify', [msg, sig, key], -1);
};
/**
* Execute the ec signing job (no timeout).
* @param {Buffer} msg
* @param {Buffer} key
* @param {Function} callback
*/
Workers.prototype.ecSign = function ecSign(msg, key) {
return this.execute('ecSign', [msg, key], -1);
};
/**
* Execute the mining job (no timeout).
* @param {MinerBlock} attempt
@ -273,7 +330,7 @@ Workers.prototype.sign = co(function* sign(tx, ring, type) {
*/
Workers.prototype.mine = function mine(attempt) {
this.execute('mine', [attempt], -1);
return this.execute('mine', [attempt], -1);
};
/**
@ -289,7 +346,7 @@ Workers.prototype.mine = function mine(attempt) {
*/
Workers.prototype.scrypt = function scrypt(passwd, salt, N, r, p, len) {
this.execute('scrypt', [passwd, salt, N, r, p, len], -1);
return this.execute('scrypt', [passwd, salt, N, r, p, len], -1);
};
/**