wallet/node: refactor node. resend all walletdb txs.

This commit is contained in:
Christopher Jeffrey 2016-09-02 18:57:23 -07:00
parent 0cbab6e72e
commit ae2f90890a
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
7 changed files with 234 additions and 128 deletions

View File

@ -1704,6 +1704,11 @@ ChainDB.prototype.pruneBlock = function pruneBlock(block, callback) {
}); });
}; };
/**
* Chain State
* @constructor
*/
function ChainState() { function ChainState() {
this.tip = constants.ZERO_HASH; this.tip = constants.ZERO_HASH;
this.tx = 0; this.tx = 0;

View File

@ -224,21 +224,6 @@ Fullnode.prototype._init = function _init() {
Fullnode.prototype._open = function open(callback) { Fullnode.prototype._open = function open(callback) {
var self = this; var self = this;
var options;
function done(err) {
if (err)
return callback(err);
self.logger.info('Node is loaded.');
callback();
}
options = {
id: 'primary',
passphrase: this.options.passphrase
};
utils.serial([ utils.serial([
this.chain.open.bind(this.chain), this.chain.open.bind(this.chain),
@ -246,45 +231,25 @@ Fullnode.prototype._open = function open(callback) {
this.miner.open.bind(this.miner), this.miner.open.bind(this.miner),
this.pool.open.bind(this.pool), this.pool.open.bind(this.pool),
this.walletdb.open.bind(this.walletdb), this.walletdb.open.bind(this.walletdb),
function (next) { // Ensure primary wallet.
self.walletdb.ensure(options, function(err, wallet) { this.openWallet.bind(this),
if (err) // Rescan for any missed transactions.
return callback(err); this.rescan.bind(this),
// Rebroadcast pending transactions.
self.logger.info('Loaded wallet with id=%s address=%s', this.resend.bind(this),
wallet.id, wallet.getAddress());
// Set the miner payout address if the
// programmer didn't pass one in.
if (!self.miner.address)
self.miner.address = wallet.getAddress();
self.wallet = wallet;
next();
});
},
function(next) {
if (self.options.noScan) {
self.walletdb.setTip(self.chain.tip.hash, self.chain.height, next);
return next();
}
// Always rescan to make sure we didn't
// miss anything: there is no atomicity
// between the chaindb and walletdb.
self.walletdb.rescan(self.chain.db, next);
},
function(next) {
// Rebroadcast pending transactions.
self.wallet.resend(next);
},
function(next) { function(next) {
if (!self.http) if (!self.http)
return next(); return next();
self.http.open(next); self.http.open(next);
} }
], done); ], function(err) {
if (err)
return callback(err);
self.logger.info('Node is loaded.');
callback();
});
}; };
/** /**
@ -312,6 +277,26 @@ Fullnode.prototype._close = function close(callback) {
], callback); ], callback);
}; };
/**
* Rescan for any missed transactions.
* @param {Function} callback
*/
Fullnode.prototype.rescan = function rescan(callback) {
if (this.options.noScan) {
this.walletdb.setTip(
this.chain.tip.hash,
this.chain.height,
callback);
return;
}
// Always rescan to make sure we didn't
// miss anything: there is no atomicity
// between the chaindb and walletdb.
this.walletdb.rescan(this.chain.db, callback);
};
/** /**
* Broadcast a transaction (note that this will _not_ be verified * Broadcast a transaction (note that this will _not_ be verified
* by the mempool - use with care, lest you get banned from * by the mempool - use with care, lest you get banned from

View File

@ -10,6 +10,7 @@
var bcoin = require('../env'); var bcoin = require('../env');
var AsyncObject = require('../utils/async'); var AsyncObject = require('../utils/async');
var utils = require('../utils/utils'); var utils = require('../utils/utils');
var assert = utils.assert;
/** /**
* Base class from which every other * Base class from which every other
@ -36,10 +37,10 @@ function Node(options) {
this.prefix = options.prefix; this.prefix = options.prefix;
this.logger = options.logger; this.logger = options.logger;
this.mempool = null;
this.pool = null;
this.chain = null; this.chain = null;
this.fees = null; this.fees = null;
this.mempool = null;
this.pool = null;
this.miner = null; this.miner = null;
this.walletdb = null; this.walletdb = null;
this.wallet = null; this.wallet = null;
@ -47,14 +48,6 @@ function Node(options) {
this._bound = []; this._bound = [];
if (!this.logger) {
this.logger = new bcoin.logger({
level: options.logLevel || 'none',
console: options.logConsole,
file: options.logFile
});
}
this.__init(); this.__init();
} }
@ -67,6 +60,15 @@ utils.inherits(Node, AsyncObject);
Node.prototype.__init = function __init() { Node.prototype.__init = function __init() {
var self = this; var self = this;
if (!this.logger) {
this.logger = new bcoin.logger({
level: this.options.logLevel || 'none',
console: this.options.logConsole,
file: this.options.logFile
});
}
this.on('preopen', function() { this.on('preopen', function() {
self._onOpen(); self._onOpen();
}); });
@ -225,6 +227,52 @@ Node.prototype.location = function location(name) {
return path; return path;
}; };
/**
* Open and ensure primary wallet.
* @param {Function} callback
*/
Node.prototype.openWallet = function openWallet(callback) {
var self = this;
var options;
assert(!this.wallet);
options = {
id: 'primary',
passphrase: this.options.passphrase
};
this.walletdb.ensure(options, function(err, wallet) {
if (err)
return callback(err);
self.logger.info(
'Loaded wallet with id=%s wid=%d address=%s',
wallet.id, wallet.wid, wallet.getAddress());
// Set the miner payout address if the
// programmer didn't pass one in.
if (self.miner) {
if (!self.miner.address)
self.miner.address = wallet.getAddress();
}
self.wallet = wallet;
callback();
});
};
/**
* Resend all pending transactions.
* @param {Function} callback
*/
Node.prototype.resend = function resend(callback) {
this.walletdb.resend(callback);
};
/* /*
* Expose * Expose
*/ */

View File

@ -148,80 +148,32 @@ SPVNode.prototype._init = function _init() {
SPVNode.prototype._open = function open(callback) { SPVNode.prototype._open = function open(callback) {
var self = this; var self = this;
var options;
function done(err) { utils.serial([
this.chain.open.bind(this.chain),
this.pool.open.bind(this.pool),
this.walletdb.open.bind(this.walletdb),
// Ensure primary wallet.
this.openWallet.bind(this),
// Load bloom filter.
this.openFilter.bind(this),
// Rescan for any missed transactions.
this.rescan.bind(this),
// Rebroadcast pending transactions.
this.resend.bind(this),
function(next) {
if (!self.http)
return next();
self.http.open(next);
}
], function(err) {
if (err) if (err)
return callback(err); return callback(err);
self.logger.info('Node is loaded.'); self.logger.info('Node is loaded.');
callback(); callback();
} });
options = {
id: 'primary',
passphrase: this.options.passphrase
};
// Create or load the primary wallet.
utils.serial([
this.chain.open.bind(this.chain),
this.pool.open.bind(this.pool),
this.walletdb.open.bind(this.walletdb),
function (next) {
self.walletdb.ensure(options, function(err, wallet) {
if (err)
return callback(err);
self.logger.info('Loaded wallet with id=%s address=%s',
wallet.id, wallet.getAddress());
self.wallet = wallet;
next();
});
},
function(next) {
var i;
self.walletdb.getAddressHashes(function(err, hashes) {
if (err)
return next(err);
if (hashes.length > 0)
self.logger.info('Adding %d addresses to filter.', hashes.length);
for (i = 0; i < hashes.length; i++)
self.pool.watch(hashes[i], 'hex');
next();
});
},
function(next) {
if (self.options.noScan) {
self.walletdb.setTip(self.chain.tip.hash, self.chain.height, next);
return next();
}
if (self.walletdb.height === 0)
return next();
// Always replay the last block to make
// sure we didn't miss anything: there
// is no atomicity between the chaindb
// and walletdb.
self.chain.reset(self.walletdb.height - 1, next);
},
function(next) {
// Rebroadcast pending transactions.
self.wallet.resend(next);
},
function(next) {
if (!self.http)
return next();
self.http.open(next);
}
], done);
}; };
/** /**
@ -247,6 +199,54 @@ SPVNode.prototype._close = function close(callback) {
], callback); ], callback);
}; };
/**
* Initialize p2p bloom filter for address watching.
* @param {Function} callback
*/
SPVNode.prototype.openFilter = function openFilter(callback) {
var self = this;
var i;
this.walletdb.getAddressHashes(function(err, hashes) {
if (err)
return callback(err);
if (hashes.length > 0)
self.logger.info('Adding %d addresses to filter.', hashes.length);
for (i = 0; i < hashes.length; i++)
self.pool.watch(hashes[i], 'hex');
callback();
});
};
/**
* Rescan for any missed transactions.
* Note that this will replay the blockchain sync.
* @param {Function} callback
*/
SPVNode.prototype.rescan = function rescan(callback) {
if (this.options.noScan) {
this.walletdb.setTip(
this.chain.tip.hash,
this.chain.height,
callback);
return;
}
if (this.walletdb.height === 0)
return callback();
// Always replay the last block to make
// sure we didn't miss anything: there
// is no atomicity between the chaindb
// and walletdb.
this.chain.reset(this.walletdb.height - 1, callback);
};
/** /**
* Broadcast a transaction (note that this will _not_ be verified * Broadcast a transaction (note that this will _not_ be verified
* by the mempool - use with care, lest you get banned from * by the mempool - use with care, lest you get banned from

View File

@ -51,6 +51,9 @@ layout.txdb = {
prefix: function prefix(wid, key) { prefix: function prefix(wid, key) {
return 't' + pad32(wid) + key; return 't' + pad32(wid) + key;
}, },
pre: function prefix(key) {
return +key.slice(1, 11);
},
hi: function hi(ch, hash, index) { hi: function hi(ch, hash, index) {
return ch + hash + pad32(index); return ch + hash + pad32(index);
}, },

View File

@ -40,6 +40,9 @@ var layout = {
key.copy(out, 5); key.copy(out, 5);
return out; return out;
}, },
pre: function prefix(key) {
return key.readUInt32BE(1, true);
},
hi: function hi(ch, hash, index) { hi: function hi(ch, hash, index) {
var key = new Buffer(37); var key = new Buffer(37);
key[0] = ch; key[0] = ch;
@ -101,7 +104,8 @@ var layout = {
return key; return key;
}, },
haa: function haa(key) { haa: function haa(key) {
return key.toString('hex', 1); key = key.slice(6);
return key.toString('hex', 0);
}, },
t: function t(hash) { t: function t(hash) {
return layout.ha(0x74, hash); return layout.ha(0x74, hash);

View File

@ -296,7 +296,7 @@ WalletDB.prototype.getDepth = function getDepth(callback) {
// walletdb.create by simply seeking to the // walletdb.create by simply seeking to the
// highest wallet wid. // highest wallet wid.
iter = this.db.iterator({ iter = this.db.iterator({
gte: layout.w(0), gte: layout.w(0x00000000),
lte: layout.w(0xffffffff), lte: layout.w(0xffffffff),
reverse: true reverse: true
}); });
@ -1119,6 +1119,67 @@ WalletDB.prototype.rescan = function rescan(chaindb, height, callback) {
}); });
}; };
/**
* Get keys of all pending transactions
* in the wallet db (for resending).
* @param {Function} callback
*/
WalletDB.prototype.getPendingKeys = function getPendingKeys(callback) {
var layout = require('./txdb').layout;
var uniq = {};
this.db.iterate({
gte: layout.prefix(0x00000000, layout.p(constants.NULL_HASH)),
lte: layout.prefix(0xffffffff, layout.p(constants.HIGH_HASH)),
keys: true,
parse: function(key) {
var wid = layout.pre(key);
var hash = layout.pp(key);
if (uniq[hash])
return;
uniq[hash] = true;
return layout.prefix(wid, layout.t(hash));
}
}, callback);
};
/**
* Resend all pending transactions.
* @param {Function} callback
*/
WalletDB.prototype.resend = function resend(callback) {
var self = this;
this.getPendingKeys(function(err, keys) {
if (err)
return callback(err);
if (keys.length > 0)
self.logger.info('Rebroadcasting %d transactions.', keys.length);
utils.forEachSerial(keys, function(key, next) {
self.db.fetch(key, function(data) {
return bcoin.tx.fromExtended(data);
}, function(err, tx) {
if (err)
return next(err);
if (!tx)
return next();
self.emit('send', tx);
next();
});
}, callback);
});
};
/** /**
* Helper function to get a wallet. * Helper function to get a wallet.
* @private * @private