From 86b228ca07bda74a74748251eb1f20d172c0ac18 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Mon, 22 Feb 2016 03:09:48 -0800 Subject: [PATCH] add utils.once. some async functions. disable blockdb truncate. --- lib/bcoin/blockdb.js | 33 +++++++++++--- lib/bcoin/chain.js | 31 ++++--------- lib/bcoin/utils.js | 106 +++++++++++++++++++++++++++++++++++-------- 3 files changed, 123 insertions(+), 47 deletions(-) diff --git a/lib/bcoin/blockdb.js b/lib/bcoin/blockdb.js index 48a8111e..7ca41a32 100644 --- a/lib/bcoin/blockdb.js +++ b/lib/bcoin/blockdb.js @@ -86,6 +86,7 @@ BlockDB.prototype.close = function close(callback) { BlockDB.prototype.migrate = function migrate(blockSize, compression, callback) { var options, db, pending, stream, total, done; + callback = utils.once(callback); options = utils.merge({}, this.index.options); if (blockSize != null) @@ -266,6 +267,8 @@ BlockDB.prototype.saveBlock = function saveBlock(block, callback) { BlockDB.prototype.removeBlock = function removeBlock(hash, callback) { var self = this; + callback = utils.once(callback); + this.getBlock(hash, function(err, block) { var batch, pending; @@ -293,6 +296,8 @@ BlockDB.prototype.removeBlock = function removeBlock(hash, callback) { // can ONLY remove the last block. assert(block._fileOffset >= 0); assert(block._fileOffset < self.data.size); + // XXX This seems to be truncating too much right now + return callback(null, block); self.data.truncateAsync(block._fileOffset, function(err) { if (err) return callback(err); @@ -403,6 +408,7 @@ BlockDB.prototype.fillCoins = function fillCoins(txs, callback) { var pending = txs.length; callback = utils.asyncify(callback); + callback = utils.once(callback); if (!pending) return callback(); @@ -423,6 +429,7 @@ BlockDB.prototype.fillTXs = function fillTXs(txs, callback) { var pending = txs.length; callback = utils.asyncify(callback); + callback = utils.once(callback); if (!pending) return callback(); @@ -443,6 +450,7 @@ BlockDB.prototype.fillCoin = function fillCoin(tx, callback) { var pending = tx.inputs.length; callback = utils.asyncify(callback); + callback = utils.once(callback); if (!pending) return callback(); @@ -474,6 +482,7 @@ BlockDB.prototype.fillTX = function fillTX(tx, callback) { var pending = tx.inputs.length; callback = utils.asyncify(callback); + callback = utils.once(callback); if (!pending) return callback(); @@ -507,6 +516,8 @@ BlockDB.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, call var coins = []; var pending; + callback = utils.once(callback); + if (typeof addresses === 'string') addresses = [addresses]; @@ -536,6 +547,7 @@ BlockDB.prototype._getCoinsByAddress = function _getCoinsByAddress(address, call var stream; callback = utils.asyncify(callback); + callback = utils.once(callback); /* stream = this.index.createKeyStream({ @@ -681,6 +693,8 @@ BlockDB.prototype.getTXByAddress = function getTXByAddress(addresses, callback) var txs = []; var pending; + callback = utils.once(callback); + if (typeof addresses === 'string') addresses = [addresses]; @@ -713,6 +727,7 @@ BlockDB.prototype._getTXByAddress = function _getTXByAddress(address, callback) var stream; callback = utils.asyncify(callback); + callback = utils.once(callback); stream = this.index.createReadStream({ start: 't/a/' + address, @@ -951,7 +966,9 @@ BlockDB.prototype.hasCoin = function hasCoin(hash, index, callback) { BlockDB.prototype.hasUnspentTX = function hasUnspentTX(hash, callback) { var self = this; this.getTX(hash, function(err, tx) { - var called, hash, pending, spent; + var hash, pending, spent; + + callback = utils.once(callback); if (err) return callback(err); @@ -967,9 +984,6 @@ BlockDB.prototype.hasUnspentTX = function hasUnspentTX(hash, callback) { return callback(null, false); function done(err) { - if (called) - return; - called = true; if (err) return callback(err); return callback(null, spent < tx.outputs.length); @@ -1326,7 +1340,7 @@ BlockData.prototype._readAsync = function _readAsync(size, offset, callback) { } if (!bytes) - return callback(new Error('_readAsync() failed.')); + return callback(new Error('_readAsync() failed at offset ' + offset + ' ' + size + ' to go')); index += bytes; size -= bytes; @@ -1407,8 +1421,13 @@ BlockData.prototype._writeAsync = function _writeAsync(data, offset, callback) { size -= bytes; offset += bytes; - if (index === data.length) - return callback(null, true); + if (index === data.length) { + return fs.fsync(self.fd, function(err) { + if (err) + return callback(err); + return callback(null, true); + }); + } next(); }); diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 2bf2d814..e3faeda6 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -572,7 +572,6 @@ Chain.prototype._checkDuplicates = function _checkDuplicates(block, prev, callba var self = this; var height = prev.height + 1; var pending = block.txs.length; - var called; if (!this.blockdb || block.subtype !== 'block') return callback(null, true); @@ -582,12 +581,7 @@ Chain.prototype._checkDuplicates = function _checkDuplicates(block, prev, callba assert(pending); - function done(err, result) { - if (called) - return; - called = true; - callback(err, result); - } + callback = utils.once(callback); // Check all transactions block.txs.forEach(function(tx) { @@ -596,7 +590,7 @@ Chain.prototype._checkDuplicates = function _checkDuplicates(block, prev, callba // BIP30 - Ensure there are no duplicate txids self.blockdb.hasTX(hash, function(err, result) { if (err) - return done(err); + return callback(err); // Blocks 91842 and 91880 created duplicate // txids by using the same exact output script @@ -604,11 +598,11 @@ Chain.prototype._checkDuplicates = function _checkDuplicates(block, prev, callba if (result) { utils.debug('Block is overwriting txids: %s', block.rhash); if (!(network.type === 'main' && (height === 91842 || height === 91880))) - return done(null, false); + return callback(null, false); } if (!--pending) - return done(null, true); + return callback(null, true); }); }); }; @@ -1575,20 +1569,18 @@ Chain.prototype.getHashRange = function getHashRange(start, end) { Chain.prototype.getHashRangeAsync = function getHashRangeAsync(start, end, callback, force) { var self = this; - var called; var unlock = this._lock(getHashRangeAsync, [start, end, callback], force); if (!unlock) return; function done(err, result) { - if (called) - return; - called = true; unlock(); callback(err, result); } + done = utils.once(done); + this.byTimeAsync(start, function(err, start) { if (err) return done(err); @@ -1680,7 +1672,7 @@ Chain.prototype.getLocatorAsync = function getLocatorAsync(start, callback, forc var hashes = []; var top = this.height; var step = 1; - var i, called, pending; + var i, pending; var unlock = this._lock(getLocatorAsync, [start, callback], force); if (!unlock) @@ -1713,19 +1705,14 @@ Chain.prototype.getLocatorAsync = function getLocatorAsync(start, callback, forc callback = utils.asyncify(callback); function done(err) { - if (called) - return; - - called = true; - unlock(); - if (err) return callback(err); - return callback(null, hashes); } + done = utils.once(done); + i = top; for (;;) { hashes.push(i); diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index e507e1cb..8a2d074d 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -468,16 +468,23 @@ RequestCache.prototype.fulfill = function fulfill(id, err, data) { utils.RequestCache = RequestCache; -utils.asyncify = function asyncify(fn) { - if (fn && fn.name === '_asyncifiedFn') - return fn; - return function _asyncifiedFn(err, data1, data2) { - if (!fn) - return err || data1; +utils.asyncify = function asyncify(callback) { + if (callback && callback._asyncified) + return callback; + + function asyncifyFn(err, result1, result2) { + if (!callback) + return err || result1; utils.nextTick(function() { - fn(err, data1, data2); + callback(err, result1, result2); }); - }; + } + + asyncifyFn._asyncified = true; + if (callback) + asyncifyFn._once = callback._once; + + return asyncifyFn; }; utils.revHex = function revHex(s) { @@ -1466,9 +1473,9 @@ utils.ccmp = function(a, b) { return res === 0; }; -utils.iter = function iter(from, to, iter, callback) { +utils.forRange = function forRange(from, to, iter, callback) { var pending = to - from; - var i; + var called, i; callback = utils.asyncify(callback); @@ -1476,19 +1483,25 @@ utils.iter = function iter(from, to, iter, callback) { return callback(); function next(err) { - if (err) + if (called) + return; + if (err) { + called = true; return callback(err); - if (!--pending) + } + if (!--pending) { + called = true; callback(); + } } for (i = from; i < to; i++) - iter(i, next); + iter(i, next, i); }; utils.forEach = function forEach(arr, iter, callback) { var pending = arr.length; - var i; + var called, i; callback = utils.asyncify(callback); @@ -1496,17 +1509,53 @@ utils.forEach = function forEach(arr, iter, callback) { return callback(); function next(err) { - if (err) + if (called) + return; + if (err) { + called = true; return callback(err); - if (!--pending) + } + if (!--pending) { + called = true; callback(); + } } - arr.forEach(function(item) { - iter(item, next); + arr.forEach(function(item, i) { + iter(item, next, i); }); }; +utils.forRangeSerial = function forRangeSerial(from, to, iter, callback) { + callback = utils.asyncify(callback); + + (function next(err) { + if (err) + return callback(err); + if (from >= to) + return callback(); + from++; + iter(from - 1, next, from - 1); + })(); +}; + +utils.forEachSerial = function forEachSerial(arr, iter, callback) { + var i = 0; + + callback = utils.asyncify(callback); + + (function next(err) { + var item; + if (err) + return callback(err); + if (i >= arr.length) + return callback(); + item = arr[i]; + i++; + iter(item, next, i - 1); + })(); +}; + utils.mb = function mb(size) { return size / 1024 / 1024 | 0; }; @@ -1525,3 +1574,24 @@ utils.inherits = function inherits(obj, from) { f.prototype = from.prototype; obj.prototype = new f; }; + +utils.once = function once(callback) { + var called; + + if (callback && callback._once) + return callback; + + function onceFn(err, result1, result2) { + if (called) + return; + called = true; + if (callback) + callback(err, result1, result2); + } + + onceFn._once = true; + if (callback) + onceFn._asyncified = callback._asyncified; + + return onceFn; +};