utils: remove dependence on co.

This commit is contained in:
Christopher Jeffrey 2017-11-01 14:12:18 -07:00
parent 277ac9a62a
commit 8332b93721
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
17 changed files with 54 additions and 384 deletions

View File

@ -2,7 +2,6 @@
const bcoin = require('../..');
const encoding = bcoin.encoding;
const co = bcoin.co;
const Outpoint = bcoin.outpoint;
const MTX = bcoin.mtx;
const HTTP = bcoin.http;
@ -50,7 +49,7 @@ async function fundWallet(wdb, addr) {
});
await wdb.addTX(tx);
await co.timeout(300);
await new Promise(r => setTimeout(r, 300));
}
async function sendTX(addr, value) {

View File

@ -22,7 +22,6 @@ const ChainEntry = require('./chainentry');
const CoinView = require('../coins/coinview');
const Script = require('../script/script');
const {VerifyError} = require('../protocol/errors');
const co = require('../utils/co');
const encoding = require('bbuf/lib/encoding');
const thresholdStates = common.thresholdStates;
@ -817,11 +816,15 @@ Chain.prototype.verifyInputs = async function verifyInputs(block, prev, state) {
}
// Verify all txs in parallel.
if (!await co.every(jobs)) {
throw new VerifyError(block,
'invalid',
'mandatory-script-verify-flag-failed',
100);
const results = await Promise.all(jobs);
for (const result of results) {
if (!result) {
throw new VerifyError(block,
'invalid',
'mandatory-script-verify-flag-failed',
100);
}
}
return view;

View File

@ -10,7 +10,6 @@
const assert = require('assert');
const EventEmitter = require('events');
const util = require('../utils/util');
const co = require('../utils/co');
const mine = require('./mine');
const Lock = require('../utils/lock');
const encoding = require('bbuf/lib/encoding');
@ -237,7 +236,7 @@ CPUMiner.prototype._stop = async function _stop() {
CPUMiner.prototype.wait = function wait() {
return new Promise((resolve, reject) => {
assert(!this.stopJob);
this.stopJob = co.job(resolve, reject);
this.stopJob = { resolve, reject };
});
};

View File

@ -23,7 +23,6 @@ const secp256k1 = require('bcrypto/lib/secp256k1');
const StaticWriter = require('bbuf/lib/staticwriter');
const encoding = require('bbuf/lib/encoding');
const base58 = require('bstr/lib/base58');
const co = require('../utils/co');
const packets = require('./packets');
/**
@ -410,7 +409,7 @@ BIP150.prototype.wait = function wait(timeout) {
BIP150.prototype._wait = function _wait(timeout, resolve, reject) {
assert(!this.auth, 'Cannot wait for init after handshake.');
this.job = co.job(resolve, reject);
this.job = { resolve, reject };
if (this.outbound && !this.peerIdentity) {
this.reject(new Error(`No identity for ${this.hostname}.`));

View File

@ -16,7 +16,6 @@ const assert = require('assert');
const EventEmitter = require('events');
const {format} = require('util');
const util = require('../utils/util');
const co = require('../utils/co');
const hash256 = require('bcrypto/lib/hash256');
const sha256 = require('bcrypto/lib/sha256');
const ChaCha20 = require('bcrypto/lib/chacha20');
@ -527,7 +526,7 @@ BIP151.prototype.wait = function wait(timeout) {
BIP151.prototype._wait = function _wait(timeout, resolve, reject) {
assert(!this.handshake, 'Cannot wait for init after handshake.');
this.job = co.job(resolve, reject);
this.job = { resolve, reject };
this.timeout = setTimeout(() => {
this.reject(new Error('BIP151 handshake timed out.'));

View File

@ -14,7 +14,6 @@ const dns = require('bdns');
const Logger = require('blgr');
const murmur3 = require('bfilter/lib/murmur3');
const util = require('../utils/util');
const co = require('../utils/co');
const Network = require('../protocol/network');
const NetAddress = require('../primitives/netaddress');
const List = require('../utils/list');
@ -53,6 +52,7 @@ function HostList(options) {
this.timer = null;
this.needsFlush = false;
this.flushing = false;
this.init();
}
@ -201,7 +201,7 @@ HostList.prototype.start = function start() {
return;
assert(this.timer == null);
this.timer = co.setInterval(this.flush, this.options.flushInterval, this);
this.timer = setInterval(() => this.flush(), this.options.flushInterval);
};
/**
@ -216,7 +216,7 @@ HostList.prototype.stop = function stop() {
return;
assert(this.timer != null);
co.clearInterval(this.timer);
clearInterval(this.timer);
this.timer = null;
};
@ -298,6 +298,9 @@ HostList.prototype.flush = async function flush() {
if (!this.needsFlush)
return;
if (this.flushing)
return;
this.needsFlush = false;
this.logger.debug('Writing hosts to %s.', filename);
@ -305,12 +308,16 @@ HostList.prototype.flush = async function flush() {
const json = this.toJSON();
const data = JSON.stringify(json);
this.flushing = true;
try {
await fs.writeFile(filename, data, 'utf8');
} catch (e) {
this.logger.warning('Writing hosts failed.');
this.logger.error(e);
}
this.flushing = false;
};
/**

View File

@ -14,7 +14,6 @@ const tcp = require('btcp');
const Logger = require('blgr');
const encoding = require('bbuf/lib/encoding');
const RollingFilter = require('bfilter/lib/rolling');
const co = require('../utils/co');
const Parser = require('./parser');
const Framer = require('./framer');
const packets = require('./packets');
@ -1141,7 +1140,7 @@ Peer.prototype.drain = function drain() {
return Promise.resolve();
return new Promise((resolve, reject) => {
this.drainQueue.push(co.job(resolve, reject));
this.drainQueue.push({ resolve, reject });
});
};
@ -2435,7 +2434,7 @@ function RequestEntry() {
}
RequestEntry.prototype.addJob = function addJob(resolve, reject) {
this.jobs.push(co.job(resolve, reject));
this.jobs.push({ resolve, reject });
};
RequestEntry.prototype.setTimeout = function setTimeout(timeout) {

View File

@ -19,7 +19,6 @@ const RollingFilter = require('bfilter/lib/rolling');
const secp256k1 = require('bcrypto/lib/secp256k1');
const encoding = require('bbuf/lib/encoding');
const util = require('../utils/util');
const co = require('../utils/co');
const common = require('./common');
const chainCommon = require('../blockchain/common');
const Address = require('../primitives/address');
@ -93,6 +92,7 @@ function Pool(options) {
this.connected = false;
this.disconnecting = false;
this.syncing = false;
this.discovering = false;
this.spvFilter = null;
this.txFilter = null;
this.blockMap = new Set();
@ -411,7 +411,7 @@ Pool.prototype.unlisten = async function unlisten() {
Pool.prototype.startTimer = function startTimer() {
assert(this.timer == null, 'Timer already started.');
this.timer = co.setInterval(this.discover, Pool.DISCOVERY_INTERVAL, this);
this.timer = setInterval(() => this.discover(), Pool.DISCOVERY_INTERVAL);
};
/**
@ -421,7 +421,7 @@ Pool.prototype.startTimer = function startTimer() {
Pool.prototype.stopTimer = function stopTimer() {
assert(this.timer != null, 'Timer already stopped.');
co.clearInterval(this.timer);
clearInterval(this.timer);
this.timer = null;
};
@ -432,8 +432,16 @@ Pool.prototype.stopTimer = function stopTimer() {
*/
Pool.prototype.discover = async function discover() {
await this.discoverGateway();
await this.discoverSeeds(true);
if (this.discovering)
return;
try {
this.discovering = true;
await this.discoverGateway();
await this.discoverSeeds(true);
} finally {
this.discovering = false;
}
};
/**
@ -4202,7 +4210,7 @@ Object.setPrototypeOf(BroadcastItem.prototype, EventEmitter.prototype);
*/
BroadcastItem.prototype.addJob = function addJob(resolve, reject) {
this.jobs.push(co.job(resolve, reject));
this.jobs.push({ resolve, reject });
};
/**

View File

@ -9,7 +9,6 @@
const assert = require('assert');
const bweb = require('bweb');
const util = require('../utils/util');
const co = require('../utils/co');
const digest = require('bcrypto/lib/digest');
const ccmp = require('bcrypto/lib/ccmp');
const common = require('../blockchain/common');
@ -2296,7 +2295,7 @@ class RPC extends RPCBase {
longpoll() {
return new Promise((resolve, reject) => {
this.pollers.push(co.job(resolve, reject));
this.pollers.push({ resolve, reject });
});
}

View File

@ -1,334 +0,0 @@
/*!
* co.js - promise and generator control flow for bcoin
* Originally based on yoursnetwork's "asink" module.
* Copyright (c) 2014-2017, Christopher Jeffrey (MIT License).
* https://github.com/bcoin-org/bcoin
*/
'use strict';
/**
* @module utils/co
*/
const assert = require('assert');
/**
* Execute an instantiated generator.
* @param {Generator} gen
* @returns {Promise}
*/
function exec(gen) {
return new Promise((resolve, reject) => {
const step = (value, rejection) => {
let next;
try {
if (rejection)
next = gen.throw(value);
else
next = gen.next(value);
} catch (e) {
reject(e);
return;
}
if (next.done) {
resolve(next.value);
return;
}
if (!isPromise(next.value)) {
step(next.value, false);
return;
}
// eslint-disable-next-line no-use-before-define
next.value.then(succeed, fail);
};
const succeed = (value) => {
step(value, false);
};
const fail = (value) => {
step(value, true);
};
step(undefined, false);
});
}
/**
* Execute generator function
* with a context and execute.
* @param {GeneratorFunction} generator
* @param {Object?} self
* @returns {Promise}
*/
function spawn(generator, self) {
const gen = generator.call(self);
return exec(gen);
}
/**
* Wrap a generator function to be
* executed into a function that
* returns a promise.
* @param {GeneratorFunction}
* @returns {Function}
*/
function co(generator) {
return function() {
const gen = generator.apply(this, arguments);
return exec(gen);
};
}
/**
* Test whether an object is a promise.
* @param {Object} obj
* @returns {Boolean}
*/
function isPromise(obj) {
return obj && typeof obj.then === 'function';
}
/**
* Wait for a nextTick with a promise.
* @returns {Promise}
*/
function wait() {
return new Promise(resolve => setImmediate(resolve));
};
/**
* Wait for a timeout with a promise.
* @param {Number} time
* @returns {Promise}
*/
function timeout(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
/**
* Wrap `resolve` and `reject` into
* a node.js style callback.
* @param {Function} resolve
* @param {Function} reject
* @returns {Function}
*/
function wrap(resolve, reject) {
return function(err, result) {
if (err) {
reject(err);
return;
}
resolve(result);
};
}
/**
* Wrap a function that accepts node.js
* style callbacks into a function that
* returns a promise.
* @param {Function} func
* @returns {AsyncFunction}
*/
function promisify(func) {
return function(...args) {
return new Promise((resolve, reject) => {
args.push(wrap(resolve, reject));
func.call(this, ...args);
});
};
}
/**
* Wrap a promise-returning function
* into a function that accepts a
* node.js style callback.
* @param {AsyncFunction} func
* @returns {Function}
*/
function callbackify(func) {
return function(...args) {
if (args.length === 0
|| typeof args[args.length - 1] !== 'function') {
throw new Error(`${func.name || 'Function'} requires a callback.`);
}
const callback = args.pop();
func.call(this, ...args).then((value) => {
setImmediate(() => callback(null, value));
}, (err) => {
setImmediate(() => callback(err));
});
};
}
/**
* Execute each promise and
* have them pass a truth test.
* @method
* @param {Promise[]} jobs
* @returns {Promise}
*/
async function every(jobs) {
const result = await Promise.all(jobs);
for (const item of result) {
if (!item)
return false;
}
return true;
}
/**
* Start an interval. Wait for promise
* to resolve on each iteration.
* @param {Function} func
* @param {Number?} time
* @param {Object?} self
* @returns {Object}
*/
function startInterval(func, time, self) {
const ctx = {
timer: null,
stopped: false,
running: false,
resolve: null
};
const cb = async () => {
assert(ctx.timer != null);
ctx.timer = null;
try {
ctx.running = true;
await func.call(self);
} finally {
ctx.running = false;
if (!ctx.stopped)
ctx.timer = setTimeout(cb, time);
else if (ctx.resolve)
ctx.resolve();
}
};
ctx.timer = setTimeout(cb, time);
return ctx;
}
/**
* Clear an interval.
* @param {Object} ctx
*/
function stopInterval(ctx) {
assert(ctx);
if (ctx.timer != null) {
clearTimeout(ctx.timer);
ctx.timer = null;
}
ctx.stopped = true;
if (ctx.running) {
return new Promise((r) => {
ctx.resolve = r;
});
}
return Promise.resolve();
}
/**
* Start a timeout.
* @param {Function} func
* @param {Number?} time
* @param {Object?} self
* @returns {Object}
*/
function startTimeout(func, time, self) {
return {
timer: setTimeout(func.bind(self), time),
stopped: false
};
}
/**
* Clear a timeout.
* @param {Object} ctx
*/
function stopTimeout(ctx) {
assert(ctx);
if (ctx.timer != null) {
clearTimeout(ctx.timer);
ctx.timer = null;
}
ctx.stopped = true;
}
/**
* Create a job object.
* @returns {Job}
*/
function job(resolve, reject) {
return new Job(resolve, reject);
}
/**
* Job
* @constructor
* @ignore
* @param {Function} resolve
* @param {Function} reject
* @property {Function} resolve
* @property {Function} reject
*/
function Job(resolve, reject) {
this.resolve = resolve;
this.reject = reject;
}
/*
* Expose
*/
exports = co;
exports.exec = exec;
exports.spawn = spawn;
exports.co = co;
exports.wait = wait;
exports.timeout = timeout;
exports.wrap = wrap;
exports.promisify = promisify;
exports.callbackify = callbackify;
exports.every = every;
exports.setInterval = startInterval;
exports.clearInterval = stopInterval;
exports.setTimeout = startTimeout;
exports.clearTimeout = stopTimeout;
exports.job = job;
module.exports = exports;

View File

@ -12,9 +12,8 @@
exports.AsyncEmitter = require('./asyncemitter');
exports.binary = require('./binary');
exports.co = require('./co');
exports.enforce = require('./enforce');
exports.fixed = require('./fixed');
exports.Heap = require('./heap');
exports.List = require('./list');
exports.Lock = require('./lock');
exports.LRU = require('./lru');

View File

@ -12,7 +12,6 @@
const assert = require('assert');
const EventEmitter = require('events');
const os = require('os');
const co = require('../utils/co');
const Network = require('../protocol/network');
const Child = require('./child');
const jobs = require('./jobs');
@ -601,7 +600,7 @@ Worker.prototype.killJobs = function killJobs() {
function PendingJob(worker, id, resolve, reject) {
this.worker = worker;
this.id = id;
this.job = co.job(resolve, reject);
this.job = { resolve, reject };
this.timer = null;
}

View File

@ -4,7 +4,6 @@ const assert = require('assert');
const BDB = require('bdb');
const encoding = require('bbuf/lib/encoding');
const networks = require('../lib/protocol/networks');
const co = require('../lib/utils/co');
const BufferWriter = require('bbuf/lib/writer');
const BufferReader = require('bbuf/lib/reader');
const OldCoins = require('./coins-old');
@ -76,8 +75,7 @@ async function checkTipIndex() {
if (keys.length < 3) {
console.log('Note: please run ensure-tip-index.js if you haven\'t yet.');
await co.timeout(2000);
return;
return new Promise(r => setTimeout(r, 2000));
}
}
@ -92,7 +90,7 @@ async function updateOptions() {
console.log('`--network [name]`, `--spv`, `--witness`,');
console.log('`--prune`, `--index-tx`, and `--index-address`.');
console.log('Continuing migration in 5 seconds...');
await co.timeout(5000);
await new Promise(r => setTimeout(r, 5000));
}
batch.put('O', defaultOptions());
@ -106,7 +104,7 @@ async function updateDeployments() {
console.log('Warning: no deployment table found.');
console.log('Make sure `--network` is set properly.');
console.log('Continuing migration in 5 seconds...');
await co.timeout(5000);
await new Promise(r => setTimeout(r, 5000));
}
batch.put('v', defaultDeployments());

View File

@ -18,7 +18,6 @@ if (process.argv.indexOf('-h') !== -1
const assert = require('assert');
const BDB = require('bdb');
const encoding = require('bbuf/lib/encoding');
const co = require('../lib/utils/co');
const digest = require('bcrypto/lib/digest');
const BN = require('bcrypto/lib/bn');
const StaticWriter = require('bbuf/lib/staticwriter');
@ -664,7 +663,7 @@ reserializeEntries;
console.log('Starting migration in 3 seconds...');
console.log('If you crash you can start over.');
await co.timeout(3000);
await new Promise(r => setTimeout(r, 3000));
let [state, hash] = await readJournal();

View File

@ -6,7 +6,6 @@
const assert = require('./util/assert');
const consensus = require('../lib/protocol/consensus');
const encoding = require('bbuf/lib/encoding');
const co = require('../lib/utils/co');
const Address = require('../lib/primitives/address');
const Script = require('../lib/script/script');
const Outpoint = require('../lib/primitives/outpoint');
@ -101,7 +100,7 @@ describe('HTTP', function() {
});
await wdb.addTX(tx);
await co.timeout(300);
await new Promise(r => setTimeout(r, 300));
assert(receive);
assert.strictEqual(receive.name, 'default');

View File

@ -5,7 +5,6 @@
const assert = require('./util/assert');
const consensus = require('../lib/protocol/consensus');
const co = require('../lib/utils/co');
const Coin = require('../lib/primitives/coin');
const Script = require('../lib/script/script');
const Opcode = require('../lib/script/opcode');
@ -125,7 +124,7 @@ describe('Node', function() {
assert(!await chain.isMainChain(tip2));
await co.wait();
await new Promise(setImmediate);
}
});
@ -136,7 +135,7 @@ describe('Node', function() {
});
it('should have correct balance', async () => {
await co.timeout(100);
await new Promise(r => setTimeout(r, 100));
const balance = await wallet.getBalance();
assert.strictEqual(balance.unconfirmed, 550 * 1e8);
@ -173,7 +172,7 @@ describe('Node', function() {
});
it('should have correct balance', async () => {
await co.timeout(100);
await new Promise(r => setTimeout(r, 100));
const balance = await wallet.getBalance();
assert.strictEqual(balance.unconfirmed, 1100 * 1e8);
@ -252,7 +251,7 @@ describe('Node', function() {
});
it('should get balance', async () => {
await co.timeout(100);
await new Promise(r => setTimeout(r, 100));
const balance = await wallet.getBalance();
assert.strictEqual(balance.unconfirmed, 1250 * 1e8);

View File

@ -3,7 +3,6 @@
const assert = require('assert');
const FullNode = require('../../lib/node/fullnode');
const Network = require('../../lib/protocol/network');
const co = require('../../lib/utils/co');
const Logger = require('blgr');
function NodeContext(network, size) {
@ -73,7 +72,7 @@ NodeContext.prototype.close = function close() {
NodeContext.prototype.connect = async function connect() {
for (const node of this.nodes) {
await node.connect();
await co.timeout(1000);
await new Promise(r => setTimeout(r, 1000));
}
};
@ -81,7 +80,7 @@ NodeContext.prototype.disconnect = async function disconnect() {
for (let i = this.nodes.length - 1; i >= 0; i--) {
const node = this.nodes[i];
await node.disconnect();
await co.timeout(1000);
await new Promise(r => setTimeout(r, 1000));
}
};
@ -118,7 +117,7 @@ NodeContext.prototype.height = function height(index) {
};
NodeContext.prototype.sync = async function sync() {
await co.timeout(3000);
return new Promise(r => setTimeout(r, 3000));
};
module.exports = NodeContext;