From ae2f90890ad18b343e7012250b377a66e75813c5 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Fri, 2 Sep 2016 18:57:23 -0700 Subject: [PATCH] wallet/node: refactor node. resend all walletdb txs. --- lib/chain/chaindb.js | 5 ++ lib/node/fullnode.js | 83 +++++++++++-------------- lib/node/node.js | 68 ++++++++++++++++++--- lib/node/spvnode.js | 134 ++++++++++++++++++++--------------------- lib/wallet/browser.js | 3 + lib/wallet/txdb.js | 6 +- lib/wallet/walletdb.js | 63 ++++++++++++++++++- 7 files changed, 234 insertions(+), 128 deletions(-) diff --git a/lib/chain/chaindb.js b/lib/chain/chaindb.js index 444a50c3..c8db0577 100644 --- a/lib/chain/chaindb.js +++ b/lib/chain/chaindb.js @@ -1704,6 +1704,11 @@ ChainDB.prototype.pruneBlock = function pruneBlock(block, callback) { }); }; +/** + * Chain State + * @constructor + */ + function ChainState() { this.tip = constants.ZERO_HASH; this.tx = 0; diff --git a/lib/node/fullnode.js b/lib/node/fullnode.js index 35ba6dad..729f935b 100644 --- a/lib/node/fullnode.js +++ b/lib/node/fullnode.js @@ -224,21 +224,6 @@ Fullnode.prototype._init = function _init() { Fullnode.prototype._open = function open(callback) { var self = this; - var options; - - function done(err) { - if (err) - return callback(err); - - self.logger.info('Node is loaded.'); - - callback(); - } - - options = { - id: 'primary', - passphrase: this.options.passphrase - }; utils.serial([ this.chain.open.bind(this.chain), @@ -246,45 +231,25 @@ Fullnode.prototype._open = function open(callback) { this.miner.open.bind(this.miner), this.pool.open.bind(this.pool), this.walletdb.open.bind(this.walletdb), - function (next) { - self.walletdb.ensure(options, function(err, wallet) { - if (err) - return callback(err); - - self.logger.info('Loaded wallet with id=%s address=%s', - wallet.id, wallet.getAddress()); - - // Set the miner payout address if the - // programmer didn't pass one in. - if (!self.miner.address) - self.miner.address = wallet.getAddress(); - - self.wallet = wallet; - - next(); - }); - }, - function(next) { - if (self.options.noScan) { - self.walletdb.setTip(self.chain.tip.hash, self.chain.height, next); - return next(); - } - - // Always rescan to make sure we didn't - // miss anything: there is no atomicity - // between the chaindb and walletdb. - self.walletdb.rescan(self.chain.db, next); - }, - function(next) { - // Rebroadcast pending transactions. - self.wallet.resend(next); - }, + // Ensure primary wallet. + this.openWallet.bind(this), + // Rescan for any missed transactions. + this.rescan.bind(this), + // Rebroadcast pending transactions. + this.resend.bind(this), function(next) { if (!self.http) return next(); self.http.open(next); } - ], done); + ], function(err) { + if (err) + return callback(err); + + self.logger.info('Node is loaded.'); + + callback(); + }); }; /** @@ -312,6 +277,26 @@ Fullnode.prototype._close = function close(callback) { ], callback); }; +/** + * Rescan for any missed transactions. + * @param {Function} callback + */ + +Fullnode.prototype.rescan = function rescan(callback) { + if (this.options.noScan) { + this.walletdb.setTip( + this.chain.tip.hash, + this.chain.height, + callback); + return; + } + + // Always rescan to make sure we didn't + // miss anything: there is no atomicity + // between the chaindb and walletdb. + this.walletdb.rescan(this.chain.db, callback); +}; + /** * Broadcast a transaction (note that this will _not_ be verified * by the mempool - use with care, lest you get banned from diff --git a/lib/node/node.js b/lib/node/node.js index b950ab1a..8bf9c96f 100644 --- a/lib/node/node.js +++ b/lib/node/node.js @@ -10,6 +10,7 @@ var bcoin = require('../env'); var AsyncObject = require('../utils/async'); var utils = require('../utils/utils'); +var assert = utils.assert; /** * Base class from which every other @@ -36,10 +37,10 @@ function Node(options) { this.prefix = options.prefix; this.logger = options.logger; - this.mempool = null; - this.pool = null; this.chain = null; this.fees = null; + this.mempool = null; + this.pool = null; this.miner = null; this.walletdb = null; this.wallet = null; @@ -47,14 +48,6 @@ function Node(options) { this._bound = []; - if (!this.logger) { - this.logger = new bcoin.logger({ - level: options.logLevel || 'none', - console: options.logConsole, - file: options.logFile - }); - } - this.__init(); } @@ -67,6 +60,15 @@ utils.inherits(Node, AsyncObject); Node.prototype.__init = function __init() { var self = this; + + if (!this.logger) { + this.logger = new bcoin.logger({ + level: this.options.logLevel || 'none', + console: this.options.logConsole, + file: this.options.logFile + }); + } + this.on('preopen', function() { self._onOpen(); }); @@ -225,6 +227,52 @@ Node.prototype.location = function location(name) { return path; }; +/** + * Open and ensure primary wallet. + * @param {Function} callback + */ + +Node.prototype.openWallet = function openWallet(callback) { + var self = this; + var options; + + assert(!this.wallet); + + options = { + id: 'primary', + passphrase: this.options.passphrase + }; + + this.walletdb.ensure(options, function(err, wallet) { + if (err) + return callback(err); + + self.logger.info( + 'Loaded wallet with id=%s wid=%d address=%s', + wallet.id, wallet.wid, wallet.getAddress()); + + // Set the miner payout address if the + // programmer didn't pass one in. + if (self.miner) { + if (!self.miner.address) + self.miner.address = wallet.getAddress(); + } + + self.wallet = wallet; + + callback(); + }); +}; + +/** + * Resend all pending transactions. + * @param {Function} callback + */ + +Node.prototype.resend = function resend(callback) { + this.walletdb.resend(callback); +}; + /* * Expose */ diff --git a/lib/node/spvnode.js b/lib/node/spvnode.js index efd998f5..95bf08e0 100644 --- a/lib/node/spvnode.js +++ b/lib/node/spvnode.js @@ -148,80 +148,32 @@ SPVNode.prototype._init = function _init() { SPVNode.prototype._open = function open(callback) { var self = this; - var options; - function done(err) { + utils.serial([ + this.chain.open.bind(this.chain), + this.pool.open.bind(this.pool), + this.walletdb.open.bind(this.walletdb), + // Ensure primary wallet. + this.openWallet.bind(this), + // Load bloom filter. + this.openFilter.bind(this), + // Rescan for any missed transactions. + this.rescan.bind(this), + // Rebroadcast pending transactions. + this.resend.bind(this), + function(next) { + if (!self.http) + return next(); + self.http.open(next); + } + ], function(err) { if (err) return callback(err); self.logger.info('Node is loaded.'); callback(); - } - - options = { - id: 'primary', - passphrase: this.options.passphrase - }; - - // Create or load the primary wallet. - utils.serial([ - this.chain.open.bind(this.chain), - this.pool.open.bind(this.pool), - this.walletdb.open.bind(this.walletdb), - function (next) { - self.walletdb.ensure(options, function(err, wallet) { - if (err) - return callback(err); - - self.logger.info('Loaded wallet with id=%s address=%s', - wallet.id, wallet.getAddress()); - - self.wallet = wallet; - - next(); - }); - }, - function(next) { - var i; - self.walletdb.getAddressHashes(function(err, hashes) { - if (err) - return next(err); - - if (hashes.length > 0) - self.logger.info('Adding %d addresses to filter.', hashes.length); - - for (i = 0; i < hashes.length; i++) - self.pool.watch(hashes[i], 'hex'); - - next(); - }); - }, - function(next) { - if (self.options.noScan) { - self.walletdb.setTip(self.chain.tip.hash, self.chain.height, next); - return next(); - } - - if (self.walletdb.height === 0) - return next(); - - // Always replay the last block to make - // sure we didn't miss anything: there - // is no atomicity between the chaindb - // and walletdb. - self.chain.reset(self.walletdb.height - 1, next); - }, - function(next) { - // Rebroadcast pending transactions. - self.wallet.resend(next); - }, - function(next) { - if (!self.http) - return next(); - self.http.open(next); - } - ], done); + }); }; /** @@ -247,6 +199,54 @@ SPVNode.prototype._close = function close(callback) { ], callback); }; +/** + * Initialize p2p bloom filter for address watching. + * @param {Function} callback + */ + +SPVNode.prototype.openFilter = function openFilter(callback) { + var self = this; + var i; + + this.walletdb.getAddressHashes(function(err, hashes) { + if (err) + return callback(err); + + if (hashes.length > 0) + self.logger.info('Adding %d addresses to filter.', hashes.length); + + for (i = 0; i < hashes.length; i++) + self.pool.watch(hashes[i], 'hex'); + + callback(); + }); +}; + +/** + * Rescan for any missed transactions. + * Note that this will replay the blockchain sync. + * @param {Function} callback + */ + +SPVNode.prototype.rescan = function rescan(callback) { + if (this.options.noScan) { + this.walletdb.setTip( + this.chain.tip.hash, + this.chain.height, + callback); + return; + } + + if (this.walletdb.height === 0) + return callback(); + + // Always replay the last block to make + // sure we didn't miss anything: there + // is no atomicity between the chaindb + // and walletdb. + this.chain.reset(this.walletdb.height - 1, callback); +}; + /** * Broadcast a transaction (note that this will _not_ be verified * by the mempool - use with care, lest you get banned from diff --git a/lib/wallet/browser.js b/lib/wallet/browser.js index afb7ffd7..bf16f2e0 100644 --- a/lib/wallet/browser.js +++ b/lib/wallet/browser.js @@ -51,6 +51,9 @@ layout.txdb = { prefix: function prefix(wid, key) { return 't' + pad32(wid) + key; }, + pre: function prefix(key) { + return +key.slice(1, 11); + }, hi: function hi(ch, hash, index) { return ch + hash + pad32(index); }, diff --git a/lib/wallet/txdb.js b/lib/wallet/txdb.js index 886a3114..67ff20b8 100644 --- a/lib/wallet/txdb.js +++ b/lib/wallet/txdb.js @@ -40,6 +40,9 @@ var layout = { key.copy(out, 5); return out; }, + pre: function prefix(key) { + return key.readUInt32BE(1, true); + }, hi: function hi(ch, hash, index) { var key = new Buffer(37); key[0] = ch; @@ -101,7 +104,8 @@ var layout = { return key; }, haa: function haa(key) { - return key.toString('hex', 1); + key = key.slice(6); + return key.toString('hex', 0); }, t: function t(hash) { return layout.ha(0x74, hash); diff --git a/lib/wallet/walletdb.js b/lib/wallet/walletdb.js index a3917a52..216d7ca0 100644 --- a/lib/wallet/walletdb.js +++ b/lib/wallet/walletdb.js @@ -296,7 +296,7 @@ WalletDB.prototype.getDepth = function getDepth(callback) { // walletdb.create by simply seeking to the // highest wallet wid. iter = this.db.iterator({ - gte: layout.w(0), + gte: layout.w(0x00000000), lte: layout.w(0xffffffff), reverse: true }); @@ -1119,6 +1119,67 @@ WalletDB.prototype.rescan = function rescan(chaindb, height, callback) { }); }; +/** + * Get keys of all pending transactions + * in the wallet db (for resending). + * @param {Function} callback + */ + +WalletDB.prototype.getPendingKeys = function getPendingKeys(callback) { + var layout = require('./txdb').layout; + var uniq = {}; + + this.db.iterate({ + gte: layout.prefix(0x00000000, layout.p(constants.NULL_HASH)), + lte: layout.prefix(0xffffffff, layout.p(constants.HIGH_HASH)), + keys: true, + parse: function(key) { + var wid = layout.pre(key); + var hash = layout.pp(key); + + if (uniq[hash]) + return; + + uniq[hash] = true; + + return layout.prefix(wid, layout.t(hash)); + } + }, callback); +}; + +/** + * Resend all pending transactions. + * @param {Function} callback + */ + +WalletDB.prototype.resend = function resend(callback) { + var self = this; + + this.getPendingKeys(function(err, keys) { + if (err) + return callback(err); + + if (keys.length > 0) + self.logger.info('Rebroadcasting %d transactions.', keys.length); + + utils.forEachSerial(keys, function(key, next) { + self.db.fetch(key, function(data) { + return bcoin.tx.fromExtended(data); + }, function(err, tx) { + if (err) + return next(err); + + if (!tx) + return next(); + + self.emit('send', tx); + + next(); + }); + }, callback); + }); +}; + /** * Helper function to get a wallet. * @private