From 424370bc1f50b6b56de468200cb7e28f7eba0545 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 1 Nov 2017 12:57:11 -0700 Subject: [PATCH] utils: stop using asyncobject. --- lib/blockchain/chain.js | 32 ++--- lib/blockchain/chaindb.js | 4 +- lib/mempool/mempool.js | 22 ++-- lib/mining/cpuminer.js | 25 ++-- lib/mining/miner.js | 30 ++--- lib/net/bip150.js | 19 +-- lib/net/hostlist.js | 4 +- lib/net/peer.js | 10 +- lib/net/pool.js | 77 ++++++------ lib/net/proxysocket.js | 4 +- lib/net/socks.js | 98 +++++++-------- lib/net/upnp.js | 30 ++--- lib/node/fullnode.js | 36 ++++-- lib/node/node.js | 35 ++---- lib/node/spvnode.js | 30 +++-- lib/protocol/network.js | 4 +- lib/utils/asyncemitter.js | 46 +------ lib/utils/asyncobject.js | 252 -------------------------------------- lib/utils/index.js | 2 - lib/wallet/nodeclient.js | 53 +++++--- lib/wallet/nullclient.js | 170 +++++++++++++++++++++++++ lib/wallet/plugin.js | 2 +- lib/wallet/server.js | 24 +++- lib/wallet/walletdb.js | 30 +++-- test/mempool-test.js | 4 + 25 files changed, 473 insertions(+), 570 deletions(-) delete mode 100644 lib/utils/asyncobject.js create mode 100644 lib/wallet/nullclient.js diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index 76829369..14050888 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -9,7 +9,7 @@ const assert = require('assert'); const path = require('path'); -const AsyncObject = require('../utils/asyncobject'); +const AsyncEmitter = require('../utils/asyncemitter'); const Network = require('../protocol/network'); const Logger = require('blgr'); const ChainDB = require('./chaindb'); @@ -65,8 +65,9 @@ function Chain(options) { if (!(this instanceof Chain)) return new Chain(options); - AsyncObject.call(this); + AsyncEmitter.call(this); + this.opened = false; this.options = new ChainOptions(options); this.network = this.options.network; @@ -87,7 +88,7 @@ function Chain(options) { this.orphanPrev = new Map(); } -Object.setPrototypeOf(Chain.prototype, AsyncObject.prototype); +Object.setPrototypeOf(Chain.prototype, AsyncEmitter.prototype); /** * Size of set to pick median time from. @@ -99,11 +100,13 @@ Chain.MEDIAN_TIMESPAN = 11; /** * Open the chain, wait for the database to load. - * @alias Chain#open * @returns {Promise} */ -Chain.prototype._open = async function _open() { +Chain.prototype.open = async function open() { + assert(!this.opened, 'Chain is already open.'); + this.opened = true; + this.logger.info('Chain is loading.'); if (this.options.checkpoints) @@ -144,11 +147,12 @@ Chain.prototype._open = async function _open() { /** * Close the chain, wait for the database to close. - * @alias Chain#close * @returns {Promise} */ -Chain.prototype._close = function _close() { +Chain.prototype.close = async function close() { + assert(this.opened, 'Chain is not open.'); + this.opened = false; return this.db.close(); }; @@ -906,7 +910,7 @@ Chain.prototype.reorganize = async function reorganize(competitor) { competitor.height ); - await this.call('reorganize', tip, competitor); + await this.emitAsync('reorganize', tip, competitor); }; /** @@ -943,7 +947,7 @@ Chain.prototype.reorganizeSPV = async function reorganizeSPV(competitor) { for (const entry of disconnect) { const headers = entry.toHeaders(); const view = new CoinView(); - await this.call('disconnect', entry, headers, view); + await this.emitAsync('disconnect', entry, headers, view); } this.logger.warning( @@ -958,7 +962,7 @@ Chain.prototype.reorganizeSPV = async function reorganizeSPV(competitor) { 'Chain replay from height %d necessary.', fork.height); - await this.call('reorganize', tip, competitor); + await this.emitAsync('reorganize', tip, competitor); }; /** @@ -986,7 +990,7 @@ Chain.prototype.disconnect = async function disconnect(entry) { this.emit('tip', prev); - await this.call('disconnect', entry, block, view); + await this.emitAsync('disconnect', entry, block, view); }; /** @@ -1036,7 +1040,7 @@ Chain.prototype.reconnect = async function reconnect(entry) { this.emit('tip', entry); this.emit('reconnect', entry, block); - await this.call('connect', entry, block, view); + await this.emitAsync('connect', entry, block, view); }; /** @@ -1104,7 +1108,7 @@ Chain.prototype.setBestChain = async function setBestChain(entry, block, prev, f this.emit('tip', entry); this.emit('block', block, entry); - await this.call('connect', entry, block, view); + await this.emitAsync('connect', entry, block, view); }; /** @@ -1199,7 +1203,7 @@ Chain.prototype._reset = async function _reset(block, silent) { this.emit('tip', tip); if (!silent) - await this.call('reset', tip); + await this.emitAsync('reset', tip); // Reset the orphan map completely. There may // have been some orphans on a forked chain we diff --git a/lib/blockchain/chaindb.js b/lib/blockchain/chaindb.js index f590e03d..1ff6840c 100644 --- a/lib/blockchain/chaindb.js +++ b/lib/blockchain/chaindb.js @@ -2201,10 +2201,10 @@ function StateCache(network) { this.network = network; this.bits = []; this.updates = []; - this._init(); + this.init(); } -StateCache.prototype._init = function _init() { +StateCache.prototype.init = function init() { for (let i = 0; i < 32; i++) this.bits.push(null); diff --git a/lib/mempool/mempool.js b/lib/mempool/mempool.js index d7cc66a1..ed80e895 100644 --- a/lib/mempool/mempool.js +++ b/lib/mempool/mempool.js @@ -8,8 +8,8 @@ const assert = require('assert'); const path = require('path'); +const EventEmitter = require('events'); const BDB = require('bdb'); -const AsyncObject = require('../utils/asyncobject'); const common = require('../blockchain/common'); const consensus = require('../protocol/consensus'); const policy = require('../protocol/policy'); @@ -65,8 +65,9 @@ function Mempool(options) { if (!(this instanceof Mempool)) return new Mempool(options); - AsyncObject.call(this); + EventEmitter.call(this); + this.opened = false; this.options = new MempoolOptions(options); this.network = this.options.network; @@ -95,17 +96,17 @@ function Mempool(options) { this.txIndex = new TXIndex(); } -Object.setPrototypeOf(Mempool.prototype, AsyncObject.prototype); +Object.setPrototypeOf(Mempool.prototype, EventEmitter.prototype); /** * Open the chain, wait for the database to load. - * @method - * @alias Mempool#open * @returns {Promise} */ -Mempool.prototype._open = async function _open() { - await this.chain.open(); +Mempool.prototype.open = async function open() { + assert(!this.opened, 'Mempool is already open.'); + this.opened = true; + await this.cache.open(); if (this.options.persistent) { @@ -148,12 +149,13 @@ Mempool.prototype._open = async function _open() { /** * Close the chain, wait for the database to close. - * @alias Mempool#close * @returns {Promise} */ -Mempool.prototype._close = async function _close() { - await this.cache.close(); +Mempool.prototype.close = async function close() { + assert(this.opened, 'Mempool is not open.'); + this.opened = false; + return this.cache.close(); }; /** diff --git a/lib/mining/cpuminer.js b/lib/mining/cpuminer.js index 46fdef24..d5fd5bd8 100644 --- a/lib/mining/cpuminer.js +++ b/lib/mining/cpuminer.js @@ -8,9 +8,9 @@ 'use strict'; const assert = require('assert'); +const EventEmitter = require('events'); const util = require('../utils/util'); const co = require('../utils/co'); -const AsyncObject = require('../utils/asyncobject'); const mine = require('./mine'); const Lock = require('../utils/lock'); const encoding = require('bbuf/lib/encoding'); @@ -28,8 +28,9 @@ function CPUMiner(miner) { if (!(this instanceof CPUMiner)) return new CPUMiner(miner); - AsyncObject.call(this); + EventEmitter.call(this); + this.opened = false; this.miner = miner; this.network = this.miner.network; this.logger = this.miner.logger.context('cpuminer'); @@ -42,10 +43,10 @@ function CPUMiner(miner) { this.job = null; this.stopJob = null; - this._init(); + this.init(); } -Object.setPrototypeOf(CPUMiner.prototype, AsyncObject.prototype); +Object.setPrototypeOf(CPUMiner.prototype, EventEmitter.prototype); /** * Nonce range interval. @@ -60,7 +61,7 @@ CPUMiner.INTERVAL = 0xffffffff / 1500 | 0; * @private */ -CPUMiner.prototype._init = function _init() { +CPUMiner.prototype.init = function init() { this.chain.on('tip', (tip) => { if (!this.job) return; @@ -72,23 +73,23 @@ CPUMiner.prototype._init = function _init() { /** * Open the miner. - * @method - * @alias module:mining.CPUMiner#open * @returns {Promise} */ -CPUMiner.prototype._open = async function _open() { +CPUMiner.prototype.open = async function open() { + assert(!this.opened, 'CPUMiner is already open.'); + this.opened = true; }; /** * Close the miner. - * @method - * @alias module:mining.CPUMiner#close * @returns {Promise} */ -CPUMiner.prototype._close = async function _close() { - await this.stop(); +CPUMiner.prototype.close = async function close() { + assert(this.opened, 'CPUMiner is not open.'); + this.opened = false; + return this.stop(); }; /** diff --git a/lib/mining/miner.js b/lib/mining/miner.js index 4a93c948..80f0619b 100644 --- a/lib/mining/miner.js +++ b/lib/mining/miner.js @@ -8,8 +8,8 @@ 'use strict'; const assert = require('assert'); +const EventEmitter = require('events'); const Heap = require('../utils/heap'); -const AsyncObject = require('../utils/asyncobject'); const Amount = require('../btc/amount'); const Address = require('../primitives/address'); const BlockTemplate = require('./template'); @@ -30,8 +30,9 @@ function Miner(options) { if (!(this instanceof Miner)) return new Miner(options); - AsyncObject.call(this); + EventEmitter.call(this); + this.opened = false; this.options = new MinerOptions(options); this.network = this.options.network; this.logger = this.options.logger.context('miner'); @@ -45,13 +46,10 @@ function Miner(options) { this.init(); } -Object.setPrototypeOf(Miner.prototype, AsyncObject.prototype); +Object.setPrototypeOf(Miner.prototype, EventEmitter.prototype); /** - * Open the miner, wait for the chain and mempool to load. - * @method - * @alias module:mining.Miner#open - * @returns {Promise} + * Initialize the miner. */ Miner.prototype.init = function init() { @@ -62,16 +60,12 @@ Miner.prototype.init = function init() { /** * Open the miner, wait for the chain and mempool to load. - * @method - * @alias module:mining.Miner#open * @returns {Promise} */ -Miner.prototype._open = async function _open() { - await this.chain.open(); - - if (this.mempool) - await this.mempool.open(); +Miner.prototype.open = async function open() { + assert(!this.opened, 'Miner is already open.'); + this.opened = true; await this.cpu.open(); @@ -84,13 +78,13 @@ Miner.prototype._open = async function _open() { /** * Close the miner. - * @method - * @alias module:mining.Miner#close * @returns {Promise} */ -Miner.prototype._close = async function _close() { - await this.cpu.close(); +Miner.prototype.close = async function close() { + assert(this.opened, 'Miner is not open.'); + this.opened = false; + return this.cpu.close(); }; /** diff --git a/lib/net/bip150.js b/lib/net/bip150.js index e3be95a9..b0c7f48a 100644 --- a/lib/net/bip150.js +++ b/lib/net/bip150.js @@ -15,7 +15,8 @@ const fs = require('bfile'); const IP = require('binet'); const Logger = require('blgr'); const ccmp = require('bcrypto/lib/ccmp'); -const digest = require('bcrypto/lib/digest'); +const hash160 = require('bcrypto/lib/hash160'); +const hash256 = require('bcrypto/lib/hash256'); const random = require('bcrypto/lib/random'); const secp256k1 = require('bcrypto/lib/secp256k1'); const StaticWriter = require('bbuf/lib/staticwriter'); @@ -83,7 +84,7 @@ function BIP150(bip151, host, outbound, db, key) { this.timeout = null; this.onAuth = null; - this._init(); + this.init(); } Object.setPrototypeOf(BIP150.prototype, EventEmitter.prototype); @@ -93,7 +94,7 @@ Object.setPrototypeOf(BIP150.prototype, EventEmitter.prototype); * @private */ -BIP150.prototype._init = function _init() { +BIP150.prototype.init = function init() { if (this.outbound) this.peerIdentity = this.db.getKnown(this.hostname); }; @@ -252,7 +253,7 @@ BIP150.prototype.rekey = function rekey(sid, key, req, res) { key.copy(seed, 32); req.copy(seed, 64); res.copy(seed, 97); - return digest.hash256(seed); + return hash256.digest(seed); }; /** @@ -296,7 +297,7 @@ BIP150.prototype.hash = function hash(sid, ch, key) { sid.copy(data, 0); data[32] = ch.charCodeAt(0); key.copy(data, 33); - return digest.hash256(data); + return hash256.digest(data); }; /** @@ -445,8 +446,8 @@ BIP150.address = function address(key) { const bw = new StaticWriter(27); bw.writeU8(0x0f); bw.writeU16BE(0xff01); - bw.writeBytes(digest.hash160(key)); - bw.writeChecksum(digest.hash256); + bw.writeBytes(hash160.digest(key)); + bw.writeChecksum(hash256.digest); return base58.encode(bw.render()); }; @@ -468,7 +469,7 @@ function AuthDB(options) { this.known = new Map(); this.authorized = []; - this._init(options); + this.init(options); } /** @@ -476,7 +477,7 @@ function AuthDB(options) { * @param {Object} options */ -AuthDB.prototype._init = function _init(options) { +AuthDB.prototype.init = function init(options) { if (!options) return; diff --git a/lib/net/hostlist.js b/lib/net/hostlist.js index 918e84f9..9e51a64d 100644 --- a/lib/net/hostlist.js +++ b/lib/net/hostlist.js @@ -54,7 +54,7 @@ function HostList(options) { this.timer = null; this.needsFlush = false; - this._init(); + this.init(); } /** @@ -133,7 +133,7 @@ HostList.scores = { * @private */ -HostList.prototype._init = function _init() { +HostList.prototype.init = function init() { const options = this.options; const scores = HostList.scores; const hosts = IP.getPublic(); diff --git a/lib/net/peer.js b/lib/net/peer.js index 3be13f32..50061d70 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -143,7 +143,7 @@ function Peer(options) { this.responseMap = new Map(); this.compactBlocks = new Map(); - this._init(); + this.init(); } Object.setPrototypeOf(Peer.prototype, EventEmitter.prototype); @@ -262,7 +262,7 @@ Peer.fromOptions = function fromOptions(options) { * @private */ -Peer.prototype._init = function _init() { +Peer.prototype.init = function init() { this.parser.on('packet', async (packet) => { try { await this.readPacket(packet); @@ -377,7 +377,7 @@ Peer.prototype.setAuth = function setAuth(db, key) { * @param {net.Socket} socket */ -Peer.prototype.bind = function bind(socket) { +Peer.prototype._bind = function _bind(socket) { assert(!this.socket); this.socket = socket; @@ -422,7 +422,7 @@ Peer.prototype.accept = function accept(socket) { this.outbound = false; this.connected = true; - this.bind(socket); + this._bind(socket); return socket; }; @@ -443,7 +443,7 @@ Peer.prototype.connect = function connect(addr) { this.outbound = true; this.connected = false; - this.bind(socket); + this._bind(socket); return socket; }; diff --git a/lib/net/pool.js b/lib/net/pool.js index 8a8639c9..9d88cdd5 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -9,7 +9,6 @@ const assert = require('assert'); const EventEmitter = require('events'); -const AsyncObject = require('../utils/asyncobject'); const util = require('../utils/util'); const IP = require('binet'); const co = require('../utils/co'); @@ -29,6 +28,7 @@ const external = require('./external'); const List = require('../utils/list'); const tcp = require('./tcp'); const dns = require('./dns'); +const socks = require('./socks'); const HostList = require('./hostlist'); const UPNP = require('./upnp'); const InvItem = require('../primitives/invitem'); @@ -77,8 +77,9 @@ function Pool(options) { if (!(this instanceof Pool)) return new Pool(options); - AsyncObject.call(this); + EventEmitter.call(this); + this.opened = false; this.options = new PoolOptions(options); this.network = this.options.network; @@ -117,10 +118,10 @@ function Pool(options) { if (!this.options.mempool) this.txFilter = new RollingFilter(50000, 0.000001); - this._init(); + this.init(); }; -Object.setPrototypeOf(Pool.prototype, AsyncObject.prototype); +Object.setPrototypeOf(Pool.prototype, EventEmitter.prototype); /** * Discovery interval for UPNP and DNS seeds. @@ -135,7 +136,7 @@ Pool.DISCOVERY_INTERVAL = 120000; * @private */ -Pool.prototype._init = function _init() { +Pool.prototype.init = function init() { this.server.on('error', (err) => { this.emit('error', err); }); @@ -206,16 +207,12 @@ Pool.prototype._init = function _init() { /** * Open the pool, wait for the chain to load. - * @method - * @alias Pool#open * @returns {Promise} */ -Pool.prototype._open = async function _open() { - if (this.mempool) - await this.mempool.open(); - else - await this.chain.open(); +Pool.prototype.open = async function open() { + assert(!this.opened, 'Pool is already open.'); + this.opened = true; this.logger.info('Pool loaded (maxpeers=%d).', this.options.maxOutbound); @@ -228,6 +225,19 @@ Pool.prototype._open = async function _open() { this.resetChain(); }; +/** + * Close and destroy the pool. + * @method + * @alias Pool#close + * @returns {Promise} + */ + +Pool.prototype.close = async function close() { + assert(this.opened, 'Pool is not open.'); + this.opened = false; + return this.disconnect(); +}; + /** * Reset header chain. */ @@ -253,17 +263,6 @@ Pool.prototype.resetChain = function resetChain() { } }; -/** - * Close and destroy the pool. - * @method - * @alias Pool#close - * @returns {Promise} - */ - -Pool.prototype._close = async function _close() { - await this.disconnect(); -}; - /** * Connect to the network. * @method @@ -286,7 +285,7 @@ Pool.prototype.connect = async function connect() { */ Pool.prototype._connect = async function _connect() { - assert(this.loaded, 'Pool is not loaded.'); + assert(this.opened, 'Pool is not opened.'); if (this.connected) return; @@ -329,8 +328,6 @@ Pool.prototype.disconnect = async function disconnect() { */ Pool.prototype._disconnect = async function _disconnect() { - assert(this.loaded, 'Pool is not loaded.'); - if (!this.connected) return; @@ -606,7 +603,7 @@ Pool.prototype.handleSocket = function handleSocket(socket) { const host = IP.toHostname(ip, socket.remotePort); - assert(!this.peers.map[host], 'Port collision.'); + assert(!this.peers.map.has(host), 'Port collision.'); this.addInbound(socket); }; @@ -618,7 +615,7 @@ Pool.prototype.handleSocket = function handleSocket(socket) { */ Pool.prototype.addLoader = function addLoader() { - if (!this.loaded) + if (!this.opened) return; assert(!this.peers.load); @@ -657,7 +654,7 @@ Pool.prototype.addLoader = function addLoader() { */ Pool.prototype.setLoader = function setLoader(peer) { - if (!this.loaded) + if (!this.opened) return; assert(peer.outbound); @@ -677,7 +674,7 @@ Pool.prototype.setLoader = function setLoader(peer) { */ Pool.prototype.startSync = function startSync() { - if (!this.loaded) + if (!this.opened) return; assert(this.connected, 'Pool is not connected!'); @@ -691,7 +688,7 @@ Pool.prototype.startSync = function startSync() { */ Pool.prototype.forceSync = function forceSync() { - if (!this.loaded) + if (!this.opened) return; assert(this.connected, 'Pool is not connected!'); @@ -1409,7 +1406,7 @@ Pool.prototype.handleClose = async function handleClose(peer, connected) { this.emit('peer close', peer, connected); - if (!this.loaded) + if (!this.opened) return; if (this.disconnecting) @@ -1773,6 +1770,7 @@ Pool.prototype.handleGetData = async function handleGetData(peer, packet) { } const notFound = []; + let txs = 0; let blocks = 0; let compact = 0; @@ -3018,7 +3016,7 @@ Pool.prototype.handleUnknown = async function handleUnknown(peer, packet) { */ Pool.prototype.addInbound = function addInbound(socket) { - if (!this.loaded) { + if (!this.opened) { socket.destroy(); return; } @@ -3088,7 +3086,7 @@ Pool.prototype.getHost = function getHost() { */ Pool.prototype.addOutbound = function addOutbound() { - if (!this.loaded) + if (!this.opened) return; if (this.peers.outbound >= this.options.maxOutbound) @@ -3340,7 +3338,7 @@ Pool.prototype.getBlocks = async function getBlocks(peer, tip, stop) { */ Pool.prototype.getBlock = function getBlock(peer, hashes) { - if (!this.loaded) + if (!this.opened) return; if (!peer.handshake) @@ -3384,7 +3382,7 @@ Pool.prototype.getBlock = function getBlock(peer, hashes) { */ Pool.prototype.getTX = function getTX(peer, hashes) { - if (!this.loaded) + if (!this.opened) return; if (!peer.handshake) @@ -4017,7 +4015,10 @@ PoolOptions.prototype.getRate = function getRate(hash) { */ PoolOptions.prototype._createSocket = function _createSocket(port, host) { - return tcp.createSocket(port, host, this.proxy); + if (this.proxy) + return socks.connect(this.proxy, port, host); + + return tcp.createSocket(port, host); }; /** @@ -4029,7 +4030,7 @@ PoolOptions.prototype._createSocket = function _createSocket(port, host) { PoolOptions.prototype._resolve = function _resolve(name) { if (this.onion) - return dns.lookup(name, this.proxy); + return socks.resolve(this.proxy, name); return dns.lookup(name); }; diff --git a/lib/net/proxysocket.js b/lib/net/proxysocket.js index 769db5d7..5a7844a1 100644 --- a/lib/net/proxysocket.js +++ b/lib/net/proxysocket.js @@ -32,12 +32,12 @@ function ProxySocket(uri) { this.closed = false; - this._init(); + this.init(); } Object.setPrototypeOf(ProxySocket.prototype, EventEmitter.prototype); -ProxySocket.prototype._init = function _init() { +ProxySocket.prototype.init = function init() { this.socket.bind('info', (info) => { if (this.closed) return; diff --git a/lib/net/socks.js b/lib/net/socks.js index 1a462c71..e55993f3 100644 --- a/lib/net/socks.js +++ b/lib/net/socks.js @@ -15,8 +15,6 @@ const EventEmitter = require('events'); const net = require('net'); const {format} = require('util'); const IP = require('binet'); -const StaticWriter = require('bbuf/lib/staticwriter'); -const BufferReader = require('bbuf/lib/reader'); /** * SOCKS state machine @@ -322,15 +320,13 @@ SOCKS.prototype.sendAuth = function sendAuth() { const plen = Buffer.byteLength(pass, 'ascii'); const size = 3 + ulen + plen; - const bw = new StaticWriter(size); + const packet = Buffer.allocUnsafe(size); - bw.writeU8(0x01); - bw.writeU8(ulen); - bw.writeString(user, 'ascii'); - bw.writeU8(plen); - bw.writeString(pass, 'ascii'); - - const packet = bw.render(); + packet[0] = 0x01; + packet[1] = ulen; + packet.write(user, 2, ulen, 'ascii'); + packet[2 + ulen] = plen; + packet.write(pass, 2 + ulen, plen, 'ascii'); this.state = SOCKS.states.AUTH; this.socket.write(packet); @@ -374,6 +370,7 @@ SOCKS.prototype.auth = function auth() { SOCKS.prototype.sendProxy = function sendProxy() { const host = this.destHost; const port = this.destPort; + let ip, len, type, name; switch (IP.getStringType(host)) { @@ -396,20 +393,20 @@ SOCKS.prototype.sendProxy = function sendProxy() { break; } - const bw = new StaticWriter(6 + len); + const packet = Buffer.allocUnsafe(6 + len); - bw.writeU8(0x05); - bw.writeU8(0x01); - bw.writeU8(0x00); - bw.writeU8(type); + let off = 0; + + packet[off++] = 0x05; + packet[off++] = 0x01; + packet[off++] = 0x00; + packet[off++] = type; if (type === 0x03) - bw.writeU8(name.length); + packet[off++] = name.length; - bw.writeBytes(name); - bw.writeU16BE(port); - - const packet = bw.render(); + off += name.copy(packet, off); + packet.writeUInt32BE(port, off, true); this.state = SOCKS.states.PROXY; this.socket.write(packet); @@ -457,17 +454,15 @@ SOCKS.prototype.sendResolve = function sendResolve() { const name = this.name; const len = Buffer.byteLength(name, 'utf8'); - const bw = new StaticWriter(7 + len); + const packet = Buffer.allocUnsafe(7 + len); - bw.writeU8(0x05); - bw.writeU8(0xf0); - bw.writeU8(0x00); - bw.writeU8(0x03); - bw.writeU8(len); - bw.writeString(name, 'utf8'); - bw.writeU16BE(0); - - const packet = bw.render(); + packet[0] = 0x05; + packet[1] = 0xf0; + packet[2] = 0x00; + packet[3] = 0x03; + packet[4] = len; + packet.write(name, 5, len, 'utf8'); + packet.writeUInt32BE(0, 5 + len, true); this.state = SOCKS.states.RESOLVE; this.socket.write(packet); @@ -705,42 +700,47 @@ function parseProxy(host) { }; } -function parseAddr(data, offset) { - const br = new BufferReader(data); - - if (br.left() < offset + 2) +function parseAddr(data, off) { + if (data.length - off < 2) throw new Error('Bad SOCKS address length.'); - br.seek(offset); + const type = data[off]; + off += 1; - const type = br.readU8(); let host, port; switch (type) { case 0x01: { - if (br.left() < 6) + if (data.length - off < 6) throw new Error('Bad SOCKS ipv4 length.'); - host = IP.toString(br.readBytes(4)); - port = br.readU16BE(); + host = IP.toString(data.slice(off, off + 4)); + off += 4; + + port = data.readUInt16BE(off, true); break; } case 0x03: { - const len = br.readU8(); + const len = data[off]; + off += 1; - if (br.left() < len + 2) + if (data.length - off < len + 2) throw new Error('Bad SOCKS domain length.'); - host = br.readString(len, 'utf8'); - port = br.readU16BE(); + host = data.toString('utf8', off, off + len); + off += len; + + port = data.readUInt16BE(off, true); break; } case 0x04: { - if (br.left() < 18) + if (data.length - off < 18) throw new Error('Bad SOCKS ipv6 length.'); - host = IP.toString(br.readBytes(16)); - port = br.readU16BE(); + host = IP.toString(data.slice(off, off + 16)); + off += 16; + + port = data.readUInt16BE(off, true); break; } default: { @@ -748,11 +748,7 @@ function parseAddr(data, offset) { } } - return { - type: type, - host: host, - port: port - }; + return { type, host, port }; } /* diff --git a/lib/net/upnp.js b/lib/net/upnp.js index edad311e..94fa6a79 100644 --- a/lib/net/upnp.js +++ b/lib/net/upnp.js @@ -10,16 +10,14 @@ const assert = require('assert'); const dgram = require('dgram'); const url = require('url'); const breq = require('breq'); -const co = require('../utils/co'); -const Lock = require('../utils/lock'); const IP = require('binet'); /** * UPNP * @alias module:net.UPNP * @constructor - * @param {String?} host - Multicast IP. - * @param {Number?} port - Multicast port. + * @param {String} [host=239.255.255.250] - Multicast IP. + * @param {Number} [port=1900] - Multicast port. * @param {String?} gateway - Gateway name. */ @@ -30,7 +28,6 @@ function UPNP(host, port, gateway) { this.host = host || '239.255.255.250'; this.port = port || 1900; this.gateway = gateway || UPNP.INTERNET_GATEWAY; - this.locker = new Lock(); this.timeout = null; this.job = null; } @@ -133,25 +130,14 @@ UPNP.prototype.stopTimeout = function stopTimeout() { /** * Discover gateway. - * @returns {Promise} Location string. - */ - -UPNP.prototype.discover = async function discover() { - const unlock = await this.locker.lock(); - try { - return await this._discover(); - } finally { - unlock(); - } -}; - -/** - * Discover gateway (without a lock). * @private * @returns {Promise} Location string. */ -UPNP.prototype._discover = async function _discover() { +UPNP.prototype.discover = async function discover() { + if (this.job) + throw new Error('Job already in progress.'); + const socket = dgram.createSocket('udp4'); socket.on('error', (err) => { @@ -175,8 +161,8 @@ UPNP.prototype._discover = async function _discover() { socket.send(msg, this.port, this.host); - return await new Promise((resolve, reject) => { - this.job = co.job(resolve, reject); + return new Promise((resolve, reject) => { + this.job = { resolve, reject }; }); }; diff --git a/lib/node/fullnode.js b/lib/node/fullnode.js index f674be86..a5d6e567 100644 --- a/lib/node/fullnode.js +++ b/lib/node/fullnode.js @@ -7,6 +7,7 @@ 'use strict'; +const assert = require('assert'); const Chain = require('../blockchain/chain'); const Fees = require('../mempool/fees'); const Mempool = require('../mempool/mempool'); @@ -43,6 +44,8 @@ function FullNode(options) { Node.call(this, 'bcoin', 'bcoin.conf', 'debug.log', options); + this.opened = false; + // SPV flag. this.spv = false; @@ -104,6 +107,7 @@ function FullNode(options) { identityKey: this.config.buf('identity-key'), maxOutbound: this.config.uint('max-outbound'), maxInbound: this.config.uint('max-inbound'), + createSocket: this.config.func('create-socket'), proxy: this.config.str('proxy'), onion: this.config.bool('onion'), upnp: this.config.bool('upnp'), @@ -151,7 +155,7 @@ function FullNode(options) { noAuth: this.config.bool('no-auth') }); - this._init(); + this.init(); } Object.setPrototypeOf(FullNode.prototype, Node.prototype); @@ -161,7 +165,7 @@ Object.setPrototypeOf(FullNode.prototype, Node.prototype); * @private */ -FullNode.prototype._init = function _init() { +FullNode.prototype.init = function init() { // Bind to errors this.chain.on('error', err => this.error(err)); this.mempool.on('error', err => this.error(err)); @@ -176,7 +180,7 @@ FullNode.prototype._init = function _init() { this.emit('tx', tx); }); - this.chain.hook('connect', async (entry, block) => { + this.chain.on('connect', async (entry, block) => { try { await this.mempool._addBlock(entry, block.txs); } catch (e) { @@ -186,7 +190,7 @@ FullNode.prototype._init = function _init() { this.emit('connect', entry, block); }); - this.chain.hook('disconnect', async (entry, block) => { + this.chain.on('disconnect', async (entry, block) => { try { await this.mempool._removeBlock(entry, block.txs); } catch (e) { @@ -195,7 +199,7 @@ FullNode.prototype._init = function _init() { this.emit('disconnect', entry, block); }); - this.chain.hook('reorganize', async (tip, competitor) => { + this.chain.on('reorganize', async (tip, competitor) => { try { await this.mempool._handleReorg(); } catch (e) { @@ -204,7 +208,7 @@ FullNode.prototype._init = function _init() { this.emit('reorganize', tip, competitor); }); - this.chain.hook('reset', async (tip) => { + this.chain.on('reset', async (tip) => { try { await this.mempool._reset(); } catch (e) { @@ -223,7 +227,11 @@ FullNode.prototype._init = function _init() { * @returns {Promise} */ -FullNode.prototype._open = async function _open() { +FullNode.prototype.open = async function open() { + assert(!this.opened, 'FullNode is already open.'); + this.opened = true; + + await this.handlePreopen(); await this.chain.open(); await this.mempool.open(); await this.miner.open(); @@ -231,8 +239,8 @@ FullNode.prototype._open = async function _open() { await this.openPlugins(); - if (this.http) - await this.http.open(); + await this.http.open(); + await this.handleOpen(); this.logger.info('Node is loaded.'); }; @@ -243,9 +251,12 @@ FullNode.prototype._open = async function _open() { * @returns {Promise} */ -FullNode.prototype._close = async function _close() { - if (this.http) - await this.http.close(); +FullNode.prototype.close = async function close() { + assert(this.opened, 'FullNode is not open.'); + this.opened = false; + + await this.handlePreclose(); + await this.http.close(); await this.closePlugins(); @@ -253,6 +264,7 @@ FullNode.prototype._close = async function _close() { await this.miner.close(); await this.mempool.close(); await this.chain.close(); + await this.handleClose(); this.logger.info('Node is closed.'); }; diff --git a/lib/node/node.js b/lib/node/node.js index 484e86cb..ec5989dd 100644 --- a/lib/node/node.js +++ b/lib/node/node.js @@ -8,11 +8,11 @@ 'use strict'; const assert = require('assert'); +const EventEmitter = require('events'); const fs = require('bfile'); const Logger = require('blgr'); const Config = require('bcfg'); const Network = require('../protocol/network'); -const AsyncObject = require('../utils/asyncobject'); const WorkerPool = require('../workers/workerpool'); /** @@ -28,7 +28,7 @@ function Node(module, config, file, options) { if (!(this instanceof Node)) return new Node(module, config, file, options); - AsyncObject.call(this); + EventEmitter.call(this); this.config = new Config(module, { suffix: 'network', @@ -59,18 +59,18 @@ function Node(module, config, file, options) { this.miner = null; this.http = null; - this.init(file); + this._init(file); } -Object.setPrototypeOf(Node.prototype, AsyncObject.prototype); +Object.setPrototypeOf(Node.prototype, EventEmitter.prototype); /** - * Initialize options. + * Initialize node. * @private * @param {Object} options */ -Node.prototype.initOptions = function initOptions(file) { +Node.prototype._init = function _init(file) { const config = this.config; let logger = new Logger(); @@ -95,16 +95,6 @@ Node.prototype.initOptions = function initOptions(file) { timeout: config.uint('workers-timeout'), file: config.str('worker-file') }); -}; - -/** - * Initialize node. - * @private - * @param {Object} options - */ - -Node.prototype.init = function init(file) { - this.initOptions(file); this.on('error', () => {}); @@ -128,11 +118,6 @@ Node.prototype.init = function init(file) { } this.emit('error', err); }); - - this.hook('preopen', () => this.handlePreopen()); - this.hook('preclose', () => this.handlePreclose()); - this.hook('open', () => this.handleOpen()); - this.hook('close', () => this.handleClose()); }; /** @@ -166,17 +151,17 @@ Node.prototype.handlePreopen = async function handlePreopen() { await this.logger.open(); await this.workers.open(); - this.bind(this.network.time, 'offset', (offset) => { + this._bind(this.network.time, 'offset', (offset) => { this.logger.info('Time offset: %d (%d minutes).', offset, offset / 60 | 0); }); - this.bind(this.network.time, 'sample', (sample, total) => { + this._bind(this.network.time, 'sample', (sample, total) => { this.logger.debug( 'Added time data: samples=%d, offset=%d (%d minutes).', total, sample, sample / 60 | 0); }); - this.bind(this.network.time, 'mismatch', () => { + this._bind(this.network.time, 'mismatch', () => { this.logger.warning('Adjusted time mismatch!'); this.logger.warning('Please make sure your system clock is correct!'); }); @@ -229,7 +214,7 @@ Node.prototype.handleClose = async function handleClose() { * @param {Function} listener */ -Node.prototype.bind = function bind(obj, event, listener) { +Node.prototype._bind = function _bind(obj, event, listener) { this.bound.push([obj, event, listener]); obj.on(event, listener); }; diff --git a/lib/node/spvnode.js b/lib/node/spvnode.js index ad789b21..8da0137e 100644 --- a/lib/node/spvnode.js +++ b/lib/node/spvnode.js @@ -7,6 +7,7 @@ 'use strict'; +const assert = require('assert'); const Lock = require('../utils/lock'); const Chain = require('../blockchain/chain'); const Pool = require('../net/pool'); @@ -40,6 +41,8 @@ function SPVNode(options) { Node.call(this, 'bcoin', 'bcoin.conf', 'debug.log', options); + this.opened = false; + // SPV flag. this.spv = true; @@ -73,6 +76,7 @@ function SPVNode(options) { bip150: this.config.bool('bip150'), identityKey: this.config.buf('identity-key'), maxOutbound: this.config.uint('max-outbound'), + createSocket: this.config.func('create-socket'), persistent: this.config.bool('persistent'), selfish: true, listen: false @@ -98,7 +102,7 @@ function SPVNode(options) { this.scanLock = new Lock(); this.watchLock = new Lock(); - this._init(); + this.init(); } Object.setPrototypeOf(SPVNode.prototype, Node.prototype); @@ -108,7 +112,7 @@ Object.setPrototypeOf(SPVNode.prototype, Node.prototype); * @private */ -SPVNode.prototype._init = function _init() { +SPVNode.prototype.init = function init() { // Bind to errors this.chain.on('error', err => this.error(err)); this.pool.on('error', err => this.error(err)); @@ -158,36 +162,42 @@ SPVNode.prototype._init = function _init() { /** * Open the node and all its child objects, * wait for the database to load. - * @alias SPVNode#open * @returns {Promise} */ -SPVNode.prototype._open = async function _open(callback) { +SPVNode.prototype.open = async function open() { + assert(!this.opened, 'SPVNode is already open.'); + this.opened = true; + + await this.handlePreopen(); await this.chain.open(); await this.pool.open(); await this.openPlugins(); - if (this.http) - await this.http.open(); + await this.http.open(); + await this.handleOpen(); this.logger.info('Node is loaded.'); }; /** * Close the node, wait for the database to close. - * @alias SPVNode#close * @returns {Promise} */ -SPVNode.prototype._close = async function _close() { - if (this.http) - await this.http.close(); +SPVNode.prototype.close = async function close() { + assert(this.opened, 'SPVNode is not open.'); + this.opened = false; + + await this.handlePreclose(); + await this.http.close(); await this.closePlugins(); await this.pool.close(); await this.chain.close(); + await this.handleClose(); }; /** diff --git a/lib/protocol/network.js b/lib/protocol/network.js index 89c15fd0..99ce524a 100644 --- a/lib/protocol/network.js +++ b/lib/protocol/network.js @@ -56,7 +56,7 @@ function Network(options) { this.requestMempool = options.requestMempool; this.time = new TimeData(); - this._init(); + this.init(); } /** @@ -89,7 +89,7 @@ Network.simnet = null; * @returns {Object} */ -Network.prototype._init = function _init() { +Network.prototype.init = function init() { let bits = 0; for (const deployment of this.deploys) diff --git a/lib/utils/asyncemitter.js b/lib/utils/asyncemitter.js index 8ece166b..422587d2 100644 --- a/lib/utils/asyncemitter.js +++ b/lib/utils/asyncemitter.js @@ -209,48 +209,6 @@ AsyncEmitter.prototype.listenerCount = function listenerCount(type) { return listeners.length; }; -/** - * Add a listener. - * @param {String} type - * @param {Function} handler - */ - -AsyncEmitter.prototype.bind = function bind(type, handler) { - return this.on(type, handler); -}; - -/** - * Emit event. - * @param {String} type - * @param {...Object} args - * @returns {Promise} - */ - -AsyncEmitter.prototype.fire = function fire() { - return this.emit.apply(this, arguments); -}; - -/** - * Add a listener. - * @param {String} type - * @param {Function} handler - */ - -AsyncEmitter.prototype.hook = function hook(type, handler) { - return this.on(type, handler); -}; - -/** - * Emit event. - * @param {String} type - * @param {...Object} args - * @returns {Promise} - */ - -AsyncEmitter.prototype.call = function call() { - return this.emitAsync.apply(this, arguments); -}; - /** * Emit an event synchronously. * @param {String} type @@ -306,7 +264,7 @@ AsyncEmitter.prototype._emit = function _emit(type) { if (listener.once) { listeners.splice(i, 1); - i--; + i -= 1; } switch (arguments.length) { @@ -390,7 +348,7 @@ AsyncEmitter.prototype._emitAsync = async function _emitAsync(type) { if (listener.once) { listeners.splice(i, 1); - i--; + i -= 1; } switch (arguments.length) { diff --git a/lib/utils/asyncobject.js b/lib/utils/asyncobject.js deleted file mode 100644 index a0fe11b1..00000000 --- a/lib/utils/asyncobject.js +++ /dev/null @@ -1,252 +0,0 @@ -/*! - * async.js - async object class for bcoin - * Copyright (c) 2016-2017, Christopher Jeffrey (MIT License). - * https://github.com/bcoin-org/bcoin - */ - -'use strict'; - -const assert = require('assert'); -const EventEmitter = require('events'); -const Lock = require('./lock'); - -/** - * An abstract object that handles state and - * provides recallable open and close methods. - * @alias module:utils.AsyncObject - * @constructor - * @property {Boolean} loading - * @property {Boolean} closing - * @property {Boolean} loaded - */ - -function AsyncObject() { - assert(this instanceof AsyncObject); - - EventEmitter.call(this); - - this._asyncLock = new Lock(); - this._hooks = Object.create(null); - - this.loading = false; - this.closing = false; - this.loaded = false; -} - -Object.setPrototypeOf(AsyncObject.prototype, EventEmitter.prototype); - -/** - * Open the object (recallable). - * @method - * @returns {Promise} - */ - -AsyncObject.prototype.open = async function open() { - const unlock = await this._asyncLock.lock(); - try { - return await this.__open(); - } finally { - unlock(); - } -}; - -/** - * Open the object (without a lock). - * @method - * @private - * @returns {Promise} - */ - -AsyncObject.prototype.__open = async function __open() { - if (this.loaded) - return; - - await this.call('preopen'); - - this.loading = true; - - try { - await this._open(); - } catch (e) { - this.loading = false; - this.emit('error', e); - throw e; - } - - this.loading = false; - this.loaded = true; - - await this.call('open'); -}; - -/** - * Close the object (recallable). - * @method - * @returns {Promise} - */ - -AsyncObject.prototype.close = async function close() { - const unlock = await this._asyncLock.lock(); - try { - return await this.__close(); - } finally { - unlock(); - } -}; - -/** - * Close the object (without a lock). - * @method - * @private - * @returns {Promise} - */ - -AsyncObject.prototype.__close = async function __close() { - if (!this.loaded) - return; - - await this.call('preclose'); - - this.closing = true; - - try { - await this._close(); - } catch (e) { - this.closing = false; - this.emit('error', e); - throw e; - } - - this.closing = false; - this.loaded = false; - - await this.call('close'); -}; - -/** - * Close the object (recallable). - * @method - * @returns {Promise} - */ - -AsyncObject.prototype.destroy = AsyncObject.prototype.close; - -/** - * Initialize the object. - * @private - * @returns {Promise} - */ - -AsyncObject.prototype._open = function _open(callback) { - throw new Error('Abstract method.'); -}; - -/** - * Close the object. - * @private - * @returns {Promise} - */ - -AsyncObject.prototype._close = function _close(callback) { - throw new Error('Abstract method.'); -}; - -/** - * Add a listener. - * @param {String} type - * @param {Function} handler - */ - -AsyncObject.prototype.bind = function bind(type, handler) { - return this.on(type, handler); -}; - -/** - * Emit event. - * @param {String} type - * @param {...Object} args - * @returns {Promise} - */ - -AsyncObject.prototype.fire = function fire() { - return this.emit.apply(this, arguments); -}; - -/** - * Add a hook listener. - * @param {String} type - * @param {Function} handler - */ - -AsyncObject.prototype.hook = function hook(type, handler) { - assert(typeof type === 'string', '`type` must be a string.'); - - if (!this._hooks[type]) - this._hooks[type] = []; - - this._hooks[type].push(handler); -}; - -/** - * Emit events and hooks for type. - * @method - * @param {String} type - * @param {...Object} args - * @returns {Promise} - */ - -AsyncObject.prototype.call = async function call() { - await this.callHook.apply(this, arguments); - this.emit.apply(this, arguments); -}; - -/** - * Emit an asynchronous event (hook). - * Wait for promises to resolve. - * @method - * @param {String} type - * @param {...Object} args - * @returns {Promise} - */ - -AsyncObject.prototype.callHook = async function callHook(type) { - assert(typeof type === 'string', '`type` must be a string.'); - - const listeners = this._hooks[type]; - - if (!listeners || listeners.length === 0) - return; - - let args; - - for (const handler of listeners) { - switch (arguments.length) { - case 1: - await handler(); - break; - case 2: - await handler(arguments[1]); - break; - case 3: - await handler(arguments[1], arguments[2]); - break; - case 4: - await handler(arguments[1], arguments[2], arguments[3]); - break; - default: - if (!args) { - args = new Array(arguments.length - 1); - for (let i = 1; i < arguments.length; i++) - args[i - 1] = arguments[i]; - } - await handler.apply(null, args); - break; - } - } -}; - -/* - * Expose - */ - -module.exports = AsyncObject; diff --git a/lib/utils/index.js b/lib/utils/index.js index d51b2d2a..fd7d98c4 100644 --- a/lib/utils/index.js +++ b/lib/utils/index.js @@ -11,12 +11,10 @@ */ exports.AsyncEmitter = require('./asyncemitter'); -exports.AsyncObject = require('./asyncobject'); exports.binary = require('./binary'); exports.co = require('./co'); exports.enforce = require('./enforce'); exports.fixed = require('./fixed'); -exports.Heap = require('./heap'); exports.List = require('./list'); exports.Lock = require('./lock'); exports.LRU = require('./lru'); diff --git a/lib/wallet/nodeclient.js b/lib/wallet/nodeclient.js index d5a5ed2a..2381a947 100644 --- a/lib/wallet/nodeclient.js +++ b/lib/wallet/nodeclient.js @@ -6,7 +6,8 @@ 'use strict'; -const AsyncObject = require('../utils/asyncobject'); +const assert = require('assert'); +const AsyncEmitter = require('../utils/asyncemitter'); /** * NodeClient @@ -19,47 +20,47 @@ function NodeClient(node) { if (!(this instanceof NodeClient)) return new NodeClient(node); - AsyncObject.call(this); + AsyncEmitter.call(this); this.node = node; this.network = node.network; this.filter = null; - this.listening = false; + this.opened = false; - this._init(); + this.init(); } -Object.setPrototypeOf(NodeClient.prototype, AsyncObject.prototype); +Object.setPrototypeOf(NodeClient.prototype, AsyncEmitter.prototype); /** * Initialize the client. * @returns {Promise} */ -NodeClient.prototype._init = function _init() { +NodeClient.prototype.init = function init() { this.node.on('connect', (entry, block) => { - if (!this.listening) + if (!this.opened) return; this.emit('block connect', entry, block.txs); }); this.node.on('disconnect', (entry, block) => { - if (!this.listening) + if (!this.opened) return; this.emit('block disconnect', entry); }); this.node.on('tx', (tx) => { - if (!this.listening) + if (!this.opened) return; this.emit('tx', tx); }); this.node.on('reset', (tip) => { - if (!this.listening) + if (!this.opened) return; this.emit('chain reset', tip); @@ -71,8 +72,9 @@ NodeClient.prototype._init = function _init() { * @returns {Promise} */ -NodeClient.prototype._open = async function _open(options) { - this.listening = true; +NodeClient.prototype.open = async function open(options) { + assert(!this.opened, 'NodeClient is already open.'); + this.opened = true; setImmediate(() => this.emit('connect')); }; @@ -81,11 +83,32 @@ NodeClient.prototype._open = async function _open(options) { * @returns {Promise} */ -NodeClient.prototype._close = async function _close() { - this.listening = false; +NodeClient.prototype.close = async function close() { + assert(this.opened, 'NodeClient is not open.'); + this.opened = false; setImmediate(() => this.emit('disconnect')); }; +/** + * Add a listener. + * @param {String} type + * @param {Function} handler + */ + +NodeClient.prototype.bind = function bind(type, handler) { + return this.on(type, handler); +}; + +/** + * Add a listener. + * @param {String} type + * @param {Function} handler + */ + +NodeClient.prototype.hook = function hook(type, handler) { + return this.on(type, handler); +}; + /** * Get chain tip. * @returns {Promise} @@ -187,7 +210,7 @@ NodeClient.prototype.getHashes = async function getHashes(start = -1, end = -1) NodeClient.prototype.rescan = async function rescan(start) { return this.node.chain.scan(start, this.filter, (entry, txs) => { - return this.call('block rescan', entry, txs); + return this.emitAsync('block rescan', entry, txs); }); }; diff --git a/lib/wallet/nullclient.js b/lib/wallet/nullclient.js new file mode 100644 index 00000000..0229ae1a --- /dev/null +++ b/lib/wallet/nullclient.js @@ -0,0 +1,170 @@ +/*! + * nullclient.js - node client for bcoin + * Copyright (c) 2014-2017, Christopher Jeffrey (MIT License). + * https://github.com/bcoin-org/bcoin + */ + +'use strict'; + +const assert = require('assert'); +const EventEmitter = require('events'); + +/** + * NullClient + * Sort of a fake local client for separation of concerns. + * @alias module:node.NullClient + * @constructor + */ + +function NullClient(wdb) { + if (!(this instanceof NullClient)) + return new NullClient(wdb); + + EventEmitter.call(this); + + this.wdb = wdb; + this.network = wdb.network; + this.opened = false; +} + +Object.setPrototypeOf(NullClient.prototype, EventEmitter.prototype); + +/** + * Open the client. + * @returns {Promise} + */ + +NullClient.prototype.open = async function open(options) { + assert(!this.opened, 'NullClient is already open.'); + this.opened = true; + setImmediate(() => this.emit('connect')); +}; + +/** + * Close the client. + * @returns {Promise} + */ + +NullClient.prototype.close = async function close() { + assert(this.opened, 'NullClient is not open.'); + this.opened = false; + setImmediate(() => this.emit('disconnect')); +}; + +/** + * Add a listener. + * @param {String} type + * @param {Function} handler + */ + +NullClient.prototype.bind = function bind(type, handler) { + return this.on(type, handler); +}; + +/** + * Add a listener. + * @param {String} type + * @param {Function} handler + */ + +NullClient.prototype.hook = function hook(type, handler) { + return this.on(type, handler); +}; + +/** + * Get chain tip. + * @returns {Promise} + */ + +NullClient.prototype.getTip = async function getTip() { + const {hash, height, time} = this.network.genesis; + return { hash, height, time }; +}; + +/** + * Get chain entry. + * @param {Hash} hash + * @returns {Promise} + */ + +NullClient.prototype.getEntry = async function getEntry(hash) { + return { hash, height: 0, time: 0 }; +}; + +/** + * Send a transaction. Do not wait for promise. + * @param {TX} tx + * @returns {Promise} + */ + +NullClient.prototype.send = async function send(tx) { + this.wdb.emit('send', tx); +}; + +/** + * Set bloom filter. + * @param {Bloom} filter + * @returns {Promise} + */ + +NullClient.prototype.setFilter = async function setFilter(filter) { + this.wdb.emit('set filter', filter); +}; + +/** + * Add data to filter. + * @param {Buffer} data + * @returns {Promise} + */ + +NullClient.prototype.addFilter = async function addFilter(data) { + this.wdb.emit('add filter', data); +}; + +/** + * Reset filter. + * @returns {Promise} + */ + +NullClient.prototype.resetFilter = async function resetFilter() { + this.wdb.emit('reset filter'); +}; + +/** + * Esimate smart fee. + * @param {Number?} blocks + * @returns {Promise} + */ + +NullClient.prototype.estimateFee = async function estimateFee(blocks) { + return this.network.feeRate; +}; + +/** + * Get hash range. + * @param {Number} start + * @param {Number} end + * @returns {Promise} + */ + +NullClient.prototype.getHashes = async function getHashes(start = -1, end = -1) { + return [this.network.genesis.hash]; +}; + +/** + * Rescan for any missed transactions. + * @param {Number|Hash} start - Start block. + * @param {Bloom} filter + * @param {Function} iter - Iterator. + * @returns {Promise} + */ + +NullClient.prototype.rescan = async function rescan(start) { + ; +}; + +/* + * Expose + */ + +module.exports = NullClient; diff --git a/lib/wallet/plugin.js b/lib/wallet/plugin.js index 4cd4ec25..98286bd8 100644 --- a/lib/wallet/plugin.js +++ b/lib/wallet/plugin.js @@ -82,7 +82,7 @@ Plugin.prototype.open = async function open() { Plugin.prototype.close = async function close() { this.rpc.wallet = this.wdb.primary; - await this.wdb.open(); + await this.wdb.close(); }; /** diff --git a/lib/wallet/server.js b/lib/wallet/server.js index 162aa79f..20bdbbd3 100644 --- a/lib/wallet/server.js +++ b/lib/wallet/server.js @@ -6,6 +6,7 @@ 'use strict'; +const assert = require('assert'); const Node = require('../node/node'); const WalletDB = require('./walletdb'); const HTTP = require('./http'); @@ -24,6 +25,8 @@ function WalletNode(options) { Node.call(this, 'bcoin', 'wallet.conf', 'wallet.log', options); + this.opened = false; + this.client = new Client({ network: this.network, url: this.config.str('node-url'), @@ -66,7 +69,7 @@ function WalletNode(options) { walletAuth: this.config.bool('wallet-auth') }); - this._init(); + this.init(); } Object.setPrototypeOf(WalletNode.prototype, Node.prototype); @@ -76,7 +79,7 @@ Object.setPrototypeOf(WalletNode.prototype, Node.prototype); * @private */ -WalletNode.prototype._init = function _init() { +WalletNode.prototype.init = function init() { this.wdb.on('error', err => this.error(err)); this.http.on('error', err => this.error(err)); @@ -90,8 +93,11 @@ WalletNode.prototype._init = function _init() { * @returns {Promise} */ -WalletNode.prototype._open = async function _open(callback) { - // await this.client.open(); +WalletNode.prototype.open = async function open() { + assert(!this.opened, 'WalletNode is already open.'); + this.opened = true; + + await this.handlePreopen(); await this.wdb.open(); this.rpc.wallet = this.wdb.primary; @@ -99,6 +105,7 @@ WalletNode.prototype._open = async function _open(callback) { await this.openPlugins(); await this.http.open(); + await this.handleOpen(); this.logger.info('Wallet node is loaded.'); }; @@ -109,14 +116,19 @@ WalletNode.prototype._open = async function _open(callback) { * @returns {Promise} */ -WalletNode.prototype._close = async function _close() { +WalletNode.prototype.close = async function close() { + assert(this.opened, 'WalletNode is not open.'); + this.opened = false; + + await this.handlePreclose(); await this.http.close(); await this.closePlugins(); this.rpc.wallet = null; + await this.wdb.close(); - // await this.client.close(); + await this.handleClose(); }; /* diff --git a/lib/wallet/walletdb.js b/lib/wallet/walletdb.js index 4ed7b9b3..96e4d797 100644 --- a/lib/wallet/walletdb.js +++ b/lib/wallet/walletdb.js @@ -9,24 +9,24 @@ const assert = require('assert'); const path = require('path'); +const EventEmitter = require('events'); const BDB = require('bdb'); -const AsyncObject = require('../utils/asyncobject'); -const Lock = require('../utils/lock'); -const MappedLock = require('../utils/mappedlock'); +const Logger = require('blgr'); const encoding = require('bbuf/lib/encoding'); const ccmp = require('bcrypto/lib/ccmp'); const aes = require('bcrypto/lib/aes'); +const BloomFilter = require('bfilter/lib/bloom'); +const StaticWriter = require('bbuf/lib/staticwriter'); +const Lock = require('../utils/lock'); +const MappedLock = require('../utils/mappedlock'); const Network = require('../protocol/network'); const Path = require('./path'); const common = require('./common'); const Wallet = require('./wallet'); const Account = require('./account'); -const BloomFilter = require('bfilter/lib/bloom'); -const Logger = require('blgr'); const Outpoint = require('../primitives/outpoint'); const layouts = require('./layout'); const records = require('./records'); -const StaticWriter = require('bbuf/lib/staticwriter'); const NullClient = require('./nullclient'); const layout = layouts.walletdb; const ChainState = records.ChainState; @@ -46,7 +46,7 @@ function WalletDB(options) { if (!(this instanceof WalletDB)) return new WalletDB(options); - AsyncObject.call(this); + EventEmitter.call(this); this.options = new WalletOptions(options); @@ -76,10 +76,10 @@ function WalletDB(options) { // Address and outpoint filter. this.filter = new BloomFilter(); - this._init(); + this.init(); } -Object.setPrototypeOf(WalletDB.prototype, AsyncObject.prototype); +Object.setPrototypeOf(WalletDB.prototype, EventEmitter.prototype); /** * Database layout. @@ -93,7 +93,7 @@ WalletDB.layout = layout; * @private */ -WalletDB.prototype._init = function _init() { +WalletDB.prototype.init = function init() { let items = 3000000; let flag = -1; @@ -108,7 +108,7 @@ WalletDB.prototype._init = function _init() { } this.filter = BloomFilter.fromRate(items, 0.001, flag); - this.bind(); + this._bind(); }; /** @@ -116,7 +116,7 @@ WalletDB.prototype._init = function _init() { * @private */ -WalletDB.prototype.bind = function bind() { +WalletDB.prototype._bind = function _bind() { this.client.on('error', (err) => { this.emit('error', err); }); @@ -172,11 +172,10 @@ WalletDB.prototype.bind = function bind() { /** * Open the walletdb, wait for the database to load. - * @alias WalletDB#open * @returns {Promise} */ -WalletDB.prototype._open = async function _open() { +WalletDB.prototype.open = async function open() { await this.db.open(); await this.db.checkVersion('V', 7); @@ -209,11 +208,10 @@ WalletDB.prototype._open = async function _open() { /** * Close the walletdb, wait for the database to close. - * @alias WalletDB#close * @returns {Promise} */ -WalletDB.prototype._close = async function _close() { +WalletDB.prototype.close = async function close() { await this.disconnect(); for (const wallet of this.wallets.values()) { diff --git a/test/mempool-test.js b/test/mempool-test.js index 27467033..b9314734 100644 --- a/test/mempool-test.js +++ b/test/mempool-test.js @@ -64,6 +64,8 @@ describe('Mempool', function() { this.timeout(5000); it('should open mempool', async () => { + await workers.open(); + await chain.open(); await mempool.open(); chain.state.flags |= Script.flags.VERIFY_WITNESS; }); @@ -349,5 +351,7 @@ describe('Mempool', function() { it('should destroy mempool', async () => { await mempool.close(); + await chain.close(); + await workers.close(); }); });