From 8332b93721e641566165264aa5fdd4730e4e9a0d Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 1 Nov 2017 14:12:18 -0700 Subject: [PATCH] utils: remove dependence on co. --- docs/Examples/client-api.js | 3 +- lib/blockchain/chain.js | 15 +- lib/mining/cpuminer.js | 3 +- lib/net/bip150.js | 3 +- lib/net/bip151.js | 3 +- lib/net/hostlist.js | 13 +- lib/net/peer.js | 5 +- lib/net/pool.js | 20 ++- lib/node/rpc.js | 3 +- lib/utils/co.js | 334 ------------------------------------ lib/utils/index.js | 3 +- lib/workers/workerpool.js | 3 +- migrate/chaindb1to2.js | 8 +- migrate/chaindb2to3.js | 3 +- test/http-test.js | 3 +- test/node-test.js | 9 +- test/util/node-context.js | 7 +- 17 files changed, 54 insertions(+), 384 deletions(-) delete mode 100644 lib/utils/co.js diff --git a/docs/Examples/client-api.js b/docs/Examples/client-api.js index 2dbf1d0e..3946366d 100644 --- a/docs/Examples/client-api.js +++ b/docs/Examples/client-api.js @@ -2,7 +2,6 @@ const bcoin = require('../..'); const encoding = bcoin.encoding; -const co = bcoin.co; const Outpoint = bcoin.outpoint; const MTX = bcoin.mtx; const HTTP = bcoin.http; @@ -50,7 +49,7 @@ async function fundWallet(wdb, addr) { }); await wdb.addTX(tx); - await co.timeout(300); + await new Promise(r => setTimeout(r, 300)); } async function sendTX(addr, value) { diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index 14050888..fa74c7c4 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -22,7 +22,6 @@ const ChainEntry = require('./chainentry'); const CoinView = require('../coins/coinview'); const Script = require('../script/script'); const {VerifyError} = require('../protocol/errors'); -const co = require('../utils/co'); const encoding = require('bbuf/lib/encoding'); const thresholdStates = common.thresholdStates; @@ -817,11 +816,15 @@ Chain.prototype.verifyInputs = async function verifyInputs(block, prev, state) { } // Verify all txs in parallel. - if (!await co.every(jobs)) { - throw new VerifyError(block, - 'invalid', - 'mandatory-script-verify-flag-failed', - 100); + const results = await Promise.all(jobs); + + for (const result of results) { + if (!result) { + throw new VerifyError(block, + 'invalid', + 'mandatory-script-verify-flag-failed', + 100); + } } return view; diff --git a/lib/mining/cpuminer.js b/lib/mining/cpuminer.js index d5fd5bd8..5a4cebb6 100644 --- a/lib/mining/cpuminer.js +++ b/lib/mining/cpuminer.js @@ -10,7 +10,6 @@ const assert = require('assert'); const EventEmitter = require('events'); const util = require('../utils/util'); -const co = require('../utils/co'); const mine = require('./mine'); const Lock = require('../utils/lock'); const encoding = require('bbuf/lib/encoding'); @@ -237,7 +236,7 @@ CPUMiner.prototype._stop = async function _stop() { CPUMiner.prototype.wait = function wait() { return new Promise((resolve, reject) => { assert(!this.stopJob); - this.stopJob = co.job(resolve, reject); + this.stopJob = { resolve, reject }; }); }; diff --git a/lib/net/bip150.js b/lib/net/bip150.js index 8700ab8c..ba3207cb 100644 --- a/lib/net/bip150.js +++ b/lib/net/bip150.js @@ -23,7 +23,6 @@ const secp256k1 = require('bcrypto/lib/secp256k1'); const StaticWriter = require('bbuf/lib/staticwriter'); const encoding = require('bbuf/lib/encoding'); const base58 = require('bstr/lib/base58'); -const co = require('../utils/co'); const packets = require('./packets'); /** @@ -410,7 +409,7 @@ BIP150.prototype.wait = function wait(timeout) { BIP150.prototype._wait = function _wait(timeout, resolve, reject) { assert(!this.auth, 'Cannot wait for init after handshake.'); - this.job = co.job(resolve, reject); + this.job = { resolve, reject }; if (this.outbound && !this.peerIdentity) { this.reject(new Error(`No identity for ${this.hostname}.`)); diff --git a/lib/net/bip151.js b/lib/net/bip151.js index 62331104..2439c7e8 100644 --- a/lib/net/bip151.js +++ b/lib/net/bip151.js @@ -16,7 +16,6 @@ const assert = require('assert'); const EventEmitter = require('events'); const {format} = require('util'); const util = require('../utils/util'); -const co = require('../utils/co'); const hash256 = require('bcrypto/lib/hash256'); const sha256 = require('bcrypto/lib/sha256'); const ChaCha20 = require('bcrypto/lib/chacha20'); @@ -527,7 +526,7 @@ BIP151.prototype.wait = function wait(timeout) { BIP151.prototype._wait = function _wait(timeout, resolve, reject) { assert(!this.handshake, 'Cannot wait for init after handshake.'); - this.job = co.job(resolve, reject); + this.job = { resolve, reject }; this.timeout = setTimeout(() => { this.reject(new Error('BIP151 handshake timed out.')); diff --git a/lib/net/hostlist.js b/lib/net/hostlist.js index 2854da45..5c988ad7 100644 --- a/lib/net/hostlist.js +++ b/lib/net/hostlist.js @@ -14,7 +14,6 @@ const dns = require('bdns'); const Logger = require('blgr'); const murmur3 = require('bfilter/lib/murmur3'); const util = require('../utils/util'); -const co = require('../utils/co'); const Network = require('../protocol/network'); const NetAddress = require('../primitives/netaddress'); const List = require('../utils/list'); @@ -53,6 +52,7 @@ function HostList(options) { this.timer = null; this.needsFlush = false; + this.flushing = false; this.init(); } @@ -201,7 +201,7 @@ HostList.prototype.start = function start() { return; assert(this.timer == null); - this.timer = co.setInterval(this.flush, this.options.flushInterval, this); + this.timer = setInterval(() => this.flush(), this.options.flushInterval); }; /** @@ -216,7 +216,7 @@ HostList.prototype.stop = function stop() { return; assert(this.timer != null); - co.clearInterval(this.timer); + clearInterval(this.timer); this.timer = null; }; @@ -298,6 +298,9 @@ HostList.prototype.flush = async function flush() { if (!this.needsFlush) return; + if (this.flushing) + return; + this.needsFlush = false; this.logger.debug('Writing hosts to %s.', filename); @@ -305,12 +308,16 @@ HostList.prototype.flush = async function flush() { const json = this.toJSON(); const data = JSON.stringify(json); + this.flushing = true; + try { await fs.writeFile(filename, data, 'utf8'); } catch (e) { this.logger.warning('Writing hosts failed.'); this.logger.error(e); } + + this.flushing = false; }; /** diff --git a/lib/net/peer.js b/lib/net/peer.js index b8ef7cbf..ae475074 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -14,7 +14,6 @@ const tcp = require('btcp'); const Logger = require('blgr'); const encoding = require('bbuf/lib/encoding'); const RollingFilter = require('bfilter/lib/rolling'); -const co = require('../utils/co'); const Parser = require('./parser'); const Framer = require('./framer'); const packets = require('./packets'); @@ -1141,7 +1140,7 @@ Peer.prototype.drain = function drain() { return Promise.resolve(); return new Promise((resolve, reject) => { - this.drainQueue.push(co.job(resolve, reject)); + this.drainQueue.push({ resolve, reject }); }); }; @@ -2435,7 +2434,7 @@ function RequestEntry() { } RequestEntry.prototype.addJob = function addJob(resolve, reject) { - this.jobs.push(co.job(resolve, reject)); + this.jobs.push({ resolve, reject }); }; RequestEntry.prototype.setTimeout = function setTimeout(timeout) { diff --git a/lib/net/pool.js b/lib/net/pool.js index d60d1872..66564863 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -19,7 +19,6 @@ const RollingFilter = require('bfilter/lib/rolling'); const secp256k1 = require('bcrypto/lib/secp256k1'); const encoding = require('bbuf/lib/encoding'); const util = require('../utils/util'); -const co = require('../utils/co'); const common = require('./common'); const chainCommon = require('../blockchain/common'); const Address = require('../primitives/address'); @@ -93,6 +92,7 @@ function Pool(options) { this.connected = false; this.disconnecting = false; this.syncing = false; + this.discovering = false; this.spvFilter = null; this.txFilter = null; this.blockMap = new Set(); @@ -411,7 +411,7 @@ Pool.prototype.unlisten = async function unlisten() { Pool.prototype.startTimer = function startTimer() { assert(this.timer == null, 'Timer already started.'); - this.timer = co.setInterval(this.discover, Pool.DISCOVERY_INTERVAL, this); + this.timer = setInterval(() => this.discover(), Pool.DISCOVERY_INTERVAL); }; /** @@ -421,7 +421,7 @@ Pool.prototype.startTimer = function startTimer() { Pool.prototype.stopTimer = function stopTimer() { assert(this.timer != null, 'Timer already stopped.'); - co.clearInterval(this.timer); + clearInterval(this.timer); this.timer = null; }; @@ -432,8 +432,16 @@ Pool.prototype.stopTimer = function stopTimer() { */ Pool.prototype.discover = async function discover() { - await this.discoverGateway(); - await this.discoverSeeds(true); + if (this.discovering) + return; + + try { + this.discovering = true; + await this.discoverGateway(); + await this.discoverSeeds(true); + } finally { + this.discovering = false; + } }; /** @@ -4202,7 +4210,7 @@ Object.setPrototypeOf(BroadcastItem.prototype, EventEmitter.prototype); */ BroadcastItem.prototype.addJob = function addJob(resolve, reject) { - this.jobs.push(co.job(resolve, reject)); + this.jobs.push({ resolve, reject }); }; /** diff --git a/lib/node/rpc.js b/lib/node/rpc.js index 74080077..63746d8c 100644 --- a/lib/node/rpc.js +++ b/lib/node/rpc.js @@ -9,7 +9,6 @@ const assert = require('assert'); const bweb = require('bweb'); const util = require('../utils/util'); -const co = require('../utils/co'); const digest = require('bcrypto/lib/digest'); const ccmp = require('bcrypto/lib/ccmp'); const common = require('../blockchain/common'); @@ -2296,7 +2295,7 @@ class RPC extends RPCBase { longpoll() { return new Promise((resolve, reject) => { - this.pollers.push(co.job(resolve, reject)); + this.pollers.push({ resolve, reject }); }); } diff --git a/lib/utils/co.js b/lib/utils/co.js deleted file mode 100644 index 7fb6fff7..00000000 --- a/lib/utils/co.js +++ /dev/null @@ -1,334 +0,0 @@ -/*! - * co.js - promise and generator control flow for bcoin - * Originally based on yoursnetwork's "asink" module. - * Copyright (c) 2014-2017, Christopher Jeffrey (MIT License). - * https://github.com/bcoin-org/bcoin - */ - -'use strict'; - -/** - * @module utils/co - */ - -const assert = require('assert'); - -/** - * Execute an instantiated generator. - * @param {Generator} gen - * @returns {Promise} - */ - -function exec(gen) { - return new Promise((resolve, reject) => { - const step = (value, rejection) => { - let next; - - try { - if (rejection) - next = gen.throw(value); - else - next = gen.next(value); - } catch (e) { - reject(e); - return; - } - - if (next.done) { - resolve(next.value); - return; - } - - if (!isPromise(next.value)) { - step(next.value, false); - return; - } - - // eslint-disable-next-line no-use-before-define - next.value.then(succeed, fail); - }; - - const succeed = (value) => { - step(value, false); - }; - - const fail = (value) => { - step(value, true); - }; - - step(undefined, false); - }); -} - -/** - * Execute generator function - * with a context and execute. - * @param {GeneratorFunction} generator - * @param {Object?} self - * @returns {Promise} - */ - -function spawn(generator, self) { - const gen = generator.call(self); - return exec(gen); -} - -/** - * Wrap a generator function to be - * executed into a function that - * returns a promise. - * @param {GeneratorFunction} - * @returns {Function} - */ - -function co(generator) { - return function() { - const gen = generator.apply(this, arguments); - return exec(gen); - }; -} - -/** - * Test whether an object is a promise. - * @param {Object} obj - * @returns {Boolean} - */ - -function isPromise(obj) { - return obj && typeof obj.then === 'function'; -} - -/** - * Wait for a nextTick with a promise. - * @returns {Promise} - */ - -function wait() { - return new Promise(resolve => setImmediate(resolve)); -}; - -/** - * Wait for a timeout with a promise. - * @param {Number} time - * @returns {Promise} - */ - -function timeout(time) { - return new Promise(resolve => setTimeout(resolve, time)); -} - -/** - * Wrap `resolve` and `reject` into - * a node.js style callback. - * @param {Function} resolve - * @param {Function} reject - * @returns {Function} - */ - -function wrap(resolve, reject) { - return function(err, result) { - if (err) { - reject(err); - return; - } - resolve(result); - }; -} - -/** - * Wrap a function that accepts node.js - * style callbacks into a function that - * returns a promise. - * @param {Function} func - * @returns {AsyncFunction} - */ - -function promisify(func) { - return function(...args) { - return new Promise((resolve, reject) => { - args.push(wrap(resolve, reject)); - func.call(this, ...args); - }); - }; -} - -/** - * Wrap a promise-returning function - * into a function that accepts a - * node.js style callback. - * @param {AsyncFunction} func - * @returns {Function} - */ - -function callbackify(func) { - return function(...args) { - if (args.length === 0 - || typeof args[args.length - 1] !== 'function') { - throw new Error(`${func.name || 'Function'} requires a callback.`); - } - - const callback = args.pop(); - - func.call(this, ...args).then((value) => { - setImmediate(() => callback(null, value)); - }, (err) => { - setImmediate(() => callback(err)); - }); - }; -} - -/** - * Execute each promise and - * have them pass a truth test. - * @method - * @param {Promise[]} jobs - * @returns {Promise} - */ - -async function every(jobs) { - const result = await Promise.all(jobs); - - for (const item of result) { - if (!item) - return false; - } - - return true; -} - -/** - * Start an interval. Wait for promise - * to resolve on each iteration. - * @param {Function} func - * @param {Number?} time - * @param {Object?} self - * @returns {Object} - */ - -function startInterval(func, time, self) { - const ctx = { - timer: null, - stopped: false, - running: false, - resolve: null - }; - - const cb = async () => { - assert(ctx.timer != null); - ctx.timer = null; - - try { - ctx.running = true; - await func.call(self); - } finally { - ctx.running = false; - if (!ctx.stopped) - ctx.timer = setTimeout(cb, time); - else if (ctx.resolve) - ctx.resolve(); - } - }; - - ctx.timer = setTimeout(cb, time); - - return ctx; -} - -/** - * Clear an interval. - * @param {Object} ctx - */ - -function stopInterval(ctx) { - assert(ctx); - - if (ctx.timer != null) { - clearTimeout(ctx.timer); - ctx.timer = null; - } - - ctx.stopped = true; - - if (ctx.running) { - return new Promise((r) => { - ctx.resolve = r; - }); - } - - return Promise.resolve(); -} - -/** - * Start a timeout. - * @param {Function} func - * @param {Number?} time - * @param {Object?} self - * @returns {Object} - */ - -function startTimeout(func, time, self) { - return { - timer: setTimeout(func.bind(self), time), - stopped: false - }; -} - -/** - * Clear a timeout. - * @param {Object} ctx - */ - -function stopTimeout(ctx) { - assert(ctx); - if (ctx.timer != null) { - clearTimeout(ctx.timer); - ctx.timer = null; - } - ctx.stopped = true; -} - -/** - * Create a job object. - * @returns {Job} - */ - -function job(resolve, reject) { - return new Job(resolve, reject); -} - -/** - * Job - * @constructor - * @ignore - * @param {Function} resolve - * @param {Function} reject - * @property {Function} resolve - * @property {Function} reject - */ - -function Job(resolve, reject) { - this.resolve = resolve; - this.reject = reject; -} - -/* - * Expose - */ - -exports = co; -exports.exec = exec; -exports.spawn = spawn; -exports.co = co; -exports.wait = wait; -exports.timeout = timeout; -exports.wrap = wrap; -exports.promisify = promisify; -exports.callbackify = callbackify; -exports.every = every; -exports.setInterval = startInterval; -exports.clearInterval = stopInterval; -exports.setTimeout = startTimeout; -exports.clearTimeout = stopTimeout; -exports.job = job; - -module.exports = exports; diff --git a/lib/utils/index.js b/lib/utils/index.js index fd7d98c4..4f8aa8fd 100644 --- a/lib/utils/index.js +++ b/lib/utils/index.js @@ -12,9 +12,8 @@ exports.AsyncEmitter = require('./asyncemitter'); exports.binary = require('./binary'); -exports.co = require('./co'); -exports.enforce = require('./enforce'); exports.fixed = require('./fixed'); +exports.Heap = require('./heap'); exports.List = require('./list'); exports.Lock = require('./lock'); exports.LRU = require('./lru'); diff --git a/lib/workers/workerpool.js b/lib/workers/workerpool.js index 77655172..9671d0ff 100644 --- a/lib/workers/workerpool.js +++ b/lib/workers/workerpool.js @@ -12,7 +12,6 @@ const assert = require('assert'); const EventEmitter = require('events'); const os = require('os'); -const co = require('../utils/co'); const Network = require('../protocol/network'); const Child = require('./child'); const jobs = require('./jobs'); @@ -601,7 +600,7 @@ Worker.prototype.killJobs = function killJobs() { function PendingJob(worker, id, resolve, reject) { this.worker = worker; this.id = id; - this.job = co.job(resolve, reject); + this.job = { resolve, reject }; this.timer = null; } diff --git a/migrate/chaindb1to2.js b/migrate/chaindb1to2.js index 86f0b01d..0526132f 100644 --- a/migrate/chaindb1to2.js +++ b/migrate/chaindb1to2.js @@ -4,7 +4,6 @@ const assert = require('assert'); const BDB = require('bdb'); const encoding = require('bbuf/lib/encoding'); const networks = require('../lib/protocol/networks'); -const co = require('../lib/utils/co'); const BufferWriter = require('bbuf/lib/writer'); const BufferReader = require('bbuf/lib/reader'); const OldCoins = require('./coins-old'); @@ -76,8 +75,7 @@ async function checkTipIndex() { if (keys.length < 3) { console.log('Note: please run ensure-tip-index.js if you haven\'t yet.'); - await co.timeout(2000); - return; + return new Promise(r => setTimeout(r, 2000)); } } @@ -92,7 +90,7 @@ async function updateOptions() { console.log('`--network [name]`, `--spv`, `--witness`,'); console.log('`--prune`, `--index-tx`, and `--index-address`.'); console.log('Continuing migration in 5 seconds...'); - await co.timeout(5000); + await new Promise(r => setTimeout(r, 5000)); } batch.put('O', defaultOptions()); @@ -106,7 +104,7 @@ async function updateDeployments() { console.log('Warning: no deployment table found.'); console.log('Make sure `--network` is set properly.'); console.log('Continuing migration in 5 seconds...'); - await co.timeout(5000); + await new Promise(r => setTimeout(r, 5000)); } batch.put('v', defaultDeployments()); diff --git a/migrate/chaindb2to3.js b/migrate/chaindb2to3.js index 3b9b16b1..31a4d423 100644 --- a/migrate/chaindb2to3.js +++ b/migrate/chaindb2to3.js @@ -18,7 +18,6 @@ if (process.argv.indexOf('-h') !== -1 const assert = require('assert'); const BDB = require('bdb'); const encoding = require('bbuf/lib/encoding'); -const co = require('../lib/utils/co'); const digest = require('bcrypto/lib/digest'); const BN = require('bcrypto/lib/bn'); const StaticWriter = require('bbuf/lib/staticwriter'); @@ -664,7 +663,7 @@ reserializeEntries; console.log('Starting migration in 3 seconds...'); console.log('If you crash you can start over.'); - await co.timeout(3000); + await new Promise(r => setTimeout(r, 3000)); let [state, hash] = await readJournal(); diff --git a/test/http-test.js b/test/http-test.js index 3ca44137..332b8f9d 100644 --- a/test/http-test.js +++ b/test/http-test.js @@ -6,7 +6,6 @@ const assert = require('./util/assert'); const consensus = require('../lib/protocol/consensus'); const encoding = require('bbuf/lib/encoding'); -const co = require('../lib/utils/co'); const Address = require('../lib/primitives/address'); const Script = require('../lib/script/script'); const Outpoint = require('../lib/primitives/outpoint'); @@ -101,7 +100,7 @@ describe('HTTP', function() { }); await wdb.addTX(tx); - await co.timeout(300); + await new Promise(r => setTimeout(r, 300)); assert(receive); assert.strictEqual(receive.name, 'default'); diff --git a/test/node-test.js b/test/node-test.js index 1bd77657..19e5895b 100644 --- a/test/node-test.js +++ b/test/node-test.js @@ -5,7 +5,6 @@ const assert = require('./util/assert'); const consensus = require('../lib/protocol/consensus'); -const co = require('../lib/utils/co'); const Coin = require('../lib/primitives/coin'); const Script = require('../lib/script/script'); const Opcode = require('../lib/script/opcode'); @@ -125,7 +124,7 @@ describe('Node', function() { assert(!await chain.isMainChain(tip2)); - await co.wait(); + await new Promise(setImmediate); } }); @@ -136,7 +135,7 @@ describe('Node', function() { }); it('should have correct balance', async () => { - await co.timeout(100); + await new Promise(r => setTimeout(r, 100)); const balance = await wallet.getBalance(); assert.strictEqual(balance.unconfirmed, 550 * 1e8); @@ -173,7 +172,7 @@ describe('Node', function() { }); it('should have correct balance', async () => { - await co.timeout(100); + await new Promise(r => setTimeout(r, 100)); const balance = await wallet.getBalance(); assert.strictEqual(balance.unconfirmed, 1100 * 1e8); @@ -252,7 +251,7 @@ describe('Node', function() { }); it('should get balance', async () => { - await co.timeout(100); + await new Promise(r => setTimeout(r, 100)); const balance = await wallet.getBalance(); assert.strictEqual(balance.unconfirmed, 1250 * 1e8); diff --git a/test/util/node-context.js b/test/util/node-context.js index 984bcdaa..ddf65bc1 100644 --- a/test/util/node-context.js +++ b/test/util/node-context.js @@ -3,7 +3,6 @@ const assert = require('assert'); const FullNode = require('../../lib/node/fullnode'); const Network = require('../../lib/protocol/network'); -const co = require('../../lib/utils/co'); const Logger = require('blgr'); function NodeContext(network, size) { @@ -73,7 +72,7 @@ NodeContext.prototype.close = function close() { NodeContext.prototype.connect = async function connect() { for (const node of this.nodes) { await node.connect(); - await co.timeout(1000); + await new Promise(r => setTimeout(r, 1000)); } }; @@ -81,7 +80,7 @@ NodeContext.prototype.disconnect = async function disconnect() { for (let i = this.nodes.length - 1; i >= 0; i--) { const node = this.nodes[i]; await node.disconnect(); - await co.timeout(1000); + await new Promise(r => setTimeout(r, 1000)); } }; @@ -118,7 +117,7 @@ NodeContext.prototype.height = function height(index) { }; NodeContext.prototype.sync = async function sync() { - await co.timeout(3000); + return new Promise(r => setTimeout(r, 3000)); }; module.exports = NodeContext;