walletdb: sync state object refactor.

This commit is contained in:
Christopher Jeffrey 2016-10-28 22:34:01 -07:00
parent 45ad99c8f5
commit b74c32cc4b
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
5 changed files with 244 additions and 77 deletions

View File

@ -188,6 +188,12 @@ layout.txdb = {
}, },
r: function r(hash) { r: function r(hash) {
return this.ha('r', hash); return this.ha('r', hash);
},
b: function b(height) {
return 'b' + pad32(height);
},
bb: function bb(key) {
return +key.slice(1);
} }
}; };

View File

@ -19,9 +19,9 @@ var BufferWriter = require('../utils/writer');
* @param {Number} height * @param {Number} height
*/ */
function SyncState() { function ChainState() {
if (!(this instanceof SyncState)) if (!(this instanceof ChainState))
return new SyncState(); return new ChainState();
this.start = new HeaderRecord(); this.start = new HeaderRecord();
this.tip = new HeaderRecord(); this.tip = new HeaderRecord();
@ -29,13 +29,13 @@ function SyncState() {
/** /**
* Clone the block. * Clone the block.
* @returns {SyncState} * @returns {ChainState}
*/ */
SyncState.prototype.clone = function clone() { ChainState.prototype.clone = function clone() {
var state = new SyncState(); var state = new ChainState();
state.start = this.start.clone(); state.start = this.start;
state.tip = this.tip.clone(); state.tip = this.tip;
return state; return state;
}; };
@ -45,7 +45,7 @@ SyncState.prototype.clone = function clone() {
* @param {Buffer} data * @param {Buffer} data
*/ */
SyncState.prototype.fromRaw = function fromRaw(data) { ChainState.prototype.fromRaw = function fromRaw(data) {
var p = new BufferReader(data); var p = new BufferReader(data);
this.start.fromRaw(p); this.start.fromRaw(p);
this.tip.fromRaw(p); this.tip.fromRaw(p);
@ -56,11 +56,11 @@ SyncState.prototype.fromRaw = function fromRaw(data) {
* Instantiate wallet block from serialized data. * Instantiate wallet block from serialized data.
* @param {Hash} hash * @param {Hash} hash
* @param {Buffer} data * @param {Buffer} data
* @returns {SyncState} * @returns {ChainState}
*/ */
SyncState.fromRaw = function fromRaw(data) { ChainState.fromRaw = function fromRaw(data) {
return new SyncState().fromRaw(data); return new ChainState().fromRaw(data);
}; };
/** /**
@ -68,7 +68,7 @@ SyncState.fromRaw = function fromRaw(data) {
* @returns {Buffer} * @returns {Buffer}
*/ */
SyncState.prototype.toRaw = function toRaw(writer) { ChainState.prototype.toRaw = function toRaw(writer) {
var p = new BufferWriter(writer); var p = new BufferWriter(writer);
this.start.toRaw(p); this.start.toRaw(p);
@ -432,7 +432,7 @@ function serializeWallets(wids) {
* Expose * Expose
*/ */
exports.SyncState = SyncState; exports.ChainState = ChainState;
exports.HeaderRecord = HeaderRecord; exports.HeaderRecord = HeaderRecord;
exports.BlockMapRecord = BlockMapRecord; exports.BlockMapRecord = BlockMapRecord;
exports.TXMapRecord = TXMapRecord; exports.TXMapRecord = TXMapRecord;

View File

@ -190,6 +190,15 @@ var layout = {
}, },
r: function r(hash) { r: function r(hash) {
return layout.ha(0x72, hash); return layout.ha(0x72, hash);
},
b: function b(height) {
var key = new Buffer(5);
key[0] = 0x62;
key.writeUInt32BE(height, 1, true);
return key;
},
bb: function bb(key) {
return key.readUInt32BE(1, true);
} }
}; };
@ -957,6 +966,71 @@ TXDB.prototype.removeBlockRecord = co(function* removeBlockRecord(tx, height) {
this.walletdb.writeBlockMap(this.wallet, height, block); this.walletdb.writeBlockMap(this.wallet, height, block);
}); });
/**
* Get block record.
* @param {Number} height
* @returns {Promise}
*/
TXDB.prototype.getBlock = co(function* getBlock(height) {
var data = yield this.db.get(layout.b(height));
if (!data)
return;
return BlockRecord.fromRaw(data);
});
/**
* Append to the global block record.
* @param {TX} tx
* @param {Number} height
* @returns {Promise}
*/
TXDB.prototype.addBlock = co(function* addBlock(tx, entry) {
var hash = tx.hash('hex');
var block = yield this.getBlock(entry.height);
if (!block)
block = new BlockRecord(entry.hash, entry.height, entry.ts);
if (block.hashes.indexOf(hash) !== -1)
return;
block.hashes.push(hash);
this.put(layout.b(entry.height), block.toRaw());
});
/**
* Remove from the global block record.
* @param {TX}
* @param {Number} height
* @returns {Promise}
*/
TXDB.prototype.removeBlock = co(function* removeBlock(tx, height) {
var hash = tx.hash('hex');
var block = yield this.getBlock(height);
var index;
if (!block)
return;
index = block.hashes.indexOf(hash);
if (index === -1)
return;
block.hashes.splice(index, 1);
if (block.hashes.length === 0) {
this.del(layout.b(height));
return;
}
this.put(layout.b(height), block.toRaw());
});
/** /**
* Add transaction, potentially runs * Add transaction, potentially runs
* `confirm()` and `removeConflicts()`. * `confirm()` and `removeConflicts()`.
@ -3268,6 +3342,89 @@ DetailsMember.prototype.toJSON = function toJSON(network) {
}; };
}; };
/**
* Block Record
* @constructor
* @param {Hash} hash
* @param {Number} height
* @param {Number} ts
*/
function BlockRecord(hash, height, ts) {
if (!(this instanceof BlockRecord))
return new BlockRecord(hash, height, ts);
this.hash = hash || constants.NULL_HASH;
this.height = height != null ? height : -1;
this.ts = ts || 0;
this.hashes = [];
}
/**
* Instantiate wallet block from serialized tip data.
* @private
* @param {Buffer} data
*/
BlockRecord.prototype.fromRaw = function fromRaw(data) {
var p = new BufferReader(data);
this.hash = p.readHash('hex');
this.height = p.readU32();
this.ts = p.readU32();
while (p.left())
this.hashes.push(p.readHash('hex'));
return this;
};
/**
* Instantiate wallet block from serialized data.
* @param {Hash} hash
* @param {Buffer} data
* @returns {BlockRecord}
*/
BlockRecord.fromRaw = function fromRaw(data) {
return new BlockRecord().fromRaw(data);
};
/**
* Serialize the wallet block as a tip (hash and height).
* @returns {Buffer}
*/
BlockRecord.prototype.toRaw = function toRaw(writer) {
var p = new BufferWriter(writer);
var i;
p.writeHash(this.hash);
p.writeU32(this.height);
p.writeU32(this.ts);
for (i = 0; i < this.hashes.length; i++)
p.writeHash(this.hashes[i]);
if (!writer)
p = p.render();
return p;
};
/**
* Convert the block to a more json-friendly object.
* @returns {Object}
*/
BlockRecord.prototype.toJSON = function toJSON() {
return {
hash: utils.revHex(this.hash),
height: this.height,
ts: this.ts,
hashes: this.hashes.map(utils.revHex)
};
};
/* /*
* Helpers * Helpers
*/ */

View File

@ -1974,21 +1974,6 @@ Wallet.prototype.unconfirm = co(function* unconfirm(hash) {
} }
}); });
/**
* Confirm a wallet transcation.
* @param {Hash} hash
* @returns {Promise}
*/
Wallet.prototype.confirm = co(function* confirm(hash, block) {
var unlock = yield this.writeLock.lock();
try {
return yield this.txdb.confirm(hash, block);
} finally {
unlock();
}
});
/** /**
* Remove a wallet transaction. * Remove a wallet transaction.
* @param {Hash} hash * @param {Hash} hash

View File

@ -24,7 +24,7 @@ var Bloom = require('../utils/bloom');
var Logger = require('../node/logger'); var Logger = require('../node/logger');
var TX = require('../primitives/tx'); var TX = require('../primitives/tx');
var records = require('./records'); var records = require('./records');
var SyncState = records.SyncState; var ChainState = records.ChainState;
var BlockMapRecord = records.BlockMapRecord; var BlockMapRecord = records.BlockMapRecord;
var HeaderRecord = records.HeaderRecord; var HeaderRecord = records.HeaderRecord;
var PathMapRecord = records.PathMapRecord; var PathMapRecord = records.PathMapRecord;
@ -166,7 +166,7 @@ function WalletDB(options) {
this.logger = options.logger || Logger.global; this.logger = options.logger || Logger.global;
this.client = options.client; this.client = options.client;
this.state = new SyncState(); this.state = new ChainState();
this.depth = 0; this.depth = 0;
this.wallets = {}; this.wallets = {};
this.genesis = HeaderRecord.fromEntry(this.network.genesis); this.genesis = HeaderRecord.fromEntry(this.network.genesis);
@ -233,8 +233,10 @@ WalletDB.prototype._open = co(function* open() {
yield this.resend(); yield this.resend();
this.logger.info( this.logger.info(
'WalletDB loaded (depth=%d, height=%d).', 'WalletDB loaded (depth=%d, height=%d, start=%d).',
this.depth, this.state.tip.height); this.depth,
this.state.tip.height,
this.state.start.height);
}); });
/** /**
@ -265,7 +267,7 @@ WalletDB.prototype._close = co(function* close() {
WalletDB.prototype.watch = co(function* watch() { WalletDB.prototype.watch = co(function* watch() {
var hashes = yield this.getFilterHashes(); var hashes = yield this.getFilterHashes();
this.logger.info('Adding %d hashes to filter.', hashes.length); this.logger.info('Adding %d hashes to WalletDB filter.', hashes.length);
this.addFilter(hashes); this.addFilter(hashes);
}); });
@ -363,14 +365,16 @@ WalletDB.prototype.scan = co(function* scan(height) {
if (!this.client) if (!this.client)
return; return;
assert(utils.isUInt32(height), 'Must pass in a height.'); assert(utils.isUInt32(height), 'WDB: Must pass in a height.');
if (height > this.state.tip.height) if (height > this.state.tip.height)
throw new Error('Cannot rescan future blocks.'); throw new Error('WDB: Cannot rescan future blocks.');
yield this.rollback(height); yield this.rollback(height);
this.logger.info('Scanning %d blocks.', this.state.tip.height - height); this.logger.info(
'WalletDB is scanning %d blocks.',
this.state.tip.height - height + 1);
yield this.client.scan(this.state.tip.hash, this.filter, function(block, txs) { yield this.client.scan(this.state.tip.hash, this.filter, function(block, txs) {
return self._addBlock(block, txs); return self._addBlock(block, txs);
@ -440,7 +444,7 @@ WalletDB.prototype.wipe = co(function* wipe() {
var dummy = new Buffer(0); var dummy = new Buffer(0);
var i, keys, key, gte, lte; var i, keys, key, gte, lte;
this.logger.warning('Wiping txdb...'); this.logger.warning('Wiping WalletDB TXDB...');
this.logger.warning('I hope you know what you\'re doing.'); this.logger.warning('I hope you know what you\'re doing.');
keys = yield this.db.keys({ keys = yield this.db.keys({
@ -549,7 +553,7 @@ WalletDB.prototype.getDepth = co(function* getDepth() {
*/ */
WalletDB.prototype.start = function start(wallet) { WalletDB.prototype.start = function start(wallet) {
assert(!wallet.current, 'Batch already started.'); assert(!wallet.current, 'WDB: Batch already started.');
wallet.current = this.db.batch(); wallet.current = this.db.batch();
wallet.accountCache.start(); wallet.accountCache.start();
wallet.pathCache.start(); wallet.pathCache.start();
@ -591,7 +595,7 @@ WalletDB.prototype.clear = function clear(wallet) {
*/ */
WalletDB.prototype.batch = function batch(wallet) { WalletDB.prototype.batch = function batch(wallet) {
assert(wallet.current, 'Batch does not exist.'); assert(wallet.current, 'WDB: Batch does not exist.');
return wallet.current; return wallet.current;
}; };
@ -808,10 +812,10 @@ WalletDB.prototype._rename = co(function* _rename(wallet, id) {
var i, paths, path, batch; var i, paths, path, batch;
if (!utils.isName(id)) if (!utils.isName(id))
throw new Error('Bad wallet ID.'); throw new Error('WDB: Bad wallet ID.');
if (yield this.has(id)) if (yield this.has(id))
throw new Error('ID not available.'); throw new Error('WDB: ID not available.');
batch = this.start(wallet); batch = this.start(wallet);
batch.del(layout.l(old)); batch.del(layout.l(old));
@ -861,13 +865,13 @@ WalletDB.prototype.auth = co(function* auth(wid, token) {
if (typeof token === 'string') { if (typeof token === 'string') {
if (!utils.isHex256(token)) if (!utils.isHex256(token))
throw new Error('Authentication error.'); throw new Error('WDB: Authentication error.');
token = new Buffer(token, 'hex'); token = new Buffer(token, 'hex');
} }
// Compare in constant time: // Compare in constant time:
if (!crypto.ccmp(token, wallet.token)) if (!crypto.ccmp(token, wallet.token))
throw new Error('Authentication error.'); throw new Error('WDB: Authentication error.');
return wallet; return wallet;
}); });
@ -903,7 +907,7 @@ WalletDB.prototype._create = co(function* create(options) {
var wallet; var wallet;
if (exists) if (exists)
throw new Error('Wallet already exists.'); throw new Error('WDB: Wallet already exists.');
wallet = Wallet.fromOptions(this, options); wallet = Wallet.fromOptions(this, options);
wallet.wid = this.depth++; wallet.wid = this.depth++;
@ -912,7 +916,7 @@ WalletDB.prototype._create = co(function* create(options) {
this.register(wallet); this.register(wallet);
this.logger.info('Created wallet %s.', wallet.id); this.logger.info('Created wallet %s in WalletDB.', wallet.id);
return wallet; return wallet;
}); });
@ -1412,7 +1416,7 @@ WalletDB.prototype.resend = co(function* resend() {
var i, key, data, tx; var i, key, data, tx;
if (keys.length > 0) if (keys.length > 0)
this.logger.info('Rebroadcasting %d transactions.', keys.length); this.logger.info('Rebroadcasting %d WalletDB transactions.', keys.length);
for (i = 0; i < keys.length; i++) { for (i = 0; i < keys.length; i++) {
key = keys[i]; key = keys[i];
@ -1536,6 +1540,10 @@ WalletDB.prototype.init = co(function* init() {
tip = this.genesis; tip = this.genesis;
} }
this.logger.info(
'Initializing WalletDB chain state at %s (%d).',
utils.revHex(tip.hash), tip.height);
yield this.syncState(tip, true); yield this.syncState(tip, true);
}); });
@ -1550,7 +1558,7 @@ WalletDB.prototype.getState = co(function* getState() {
if (!data) if (!data)
return; return;
return SyncState.fromRaw(data); return ChainState.fromRaw(data);
}); });
/** /**
@ -1698,29 +1706,35 @@ WalletDB.prototype.getTXMap = co(function* getTXMap(hash) {
*/ */
WalletDB.prototype.rollback = co(function* rollback(height) { WalletDB.prototype.rollback = co(function* rollback(height) {
var tip, start; var tip, blocks;
if (this.state.tip.height <= height) if (this.state.tip.height <= height)
return; return;
this.logger.info( this.logger.info(
'Rolling back %d blocks to height %d.', 'Rolling back %d WalletDB blocks to height %d.',
this.state.tip.height - height, height); this.state.tip.height - height, height);
tip = yield this.getHeader(height); tip = yield this.getHeader(height);
if (!tip) { if (!tip) {
blocks = this.state.tip.height - height;
if (blocks < this.keepBlocks)
throw new Error('WDB: Block not found for rollback.');
if (height >= this.state.start.height) { if (height >= this.state.start.height) {
yield this.revert(this.state.start.height); yield this.revert(this.state.start.height);
yield this.syncState(this.state.start, true); yield this.syncState(this.state.start, true);
this.logger.warning( this.logger.warning(
'Wallet rolled back to start block (%d).', 'WalletDB rolled back to start block (%d).',
this.state.tip.height); this.state.tip.height);
} else { } else {
yield this.revert(0); yield this.revert(0);
yield this.syncState(this.genesis, true); yield this.syncState(this.genesis, true);
this.logger.warning('Wallet rolled back to genesis block.'); this.logger.warning('WalletDB rolled back to genesis block.');
} }
return; return;
} }
@ -1729,7 +1743,7 @@ WalletDB.prototype.rollback = co(function* rollback(height) {
}); });
/** /**
* Sync with chain height. * Revert TXDB to an older state.
* @param {Number} height * @param {Number} height
* @returns {Promise} * @returns {Promise}
*/ */
@ -1764,7 +1778,7 @@ WalletDB.prototype.revert = co(function* revert(height) {
} }
} }
this.logger.info('Rolled back %d transactions.', total); this.logger.info('Rolled back %d WalletDB transactions.', total);
}); });
/** /**
@ -1794,14 +1808,22 @@ WalletDB.prototype._addBlock = co(function* addBlock(entry, txs) {
var i, tip, tx; var i, tip, tx;
if (entry.height < this.state.tip.height) { if (entry.height < this.state.tip.height) {
this.logger.warning('Wallet is connecting low blocks.'); this.logger.warning(
'WalletDB is connecting low blocks (%d).',
entry.height);
return total; return total;
} }
if (entry.height === this.state.tip.height) { if (entry.height === this.state.tip.height) {
this.logger.warning('Wallet is connecting low blocks.'); // We let blocks of the same height
// through specifically for rescans:
// we always want to rescan the last
// block since the state may have
// updated before the block was fully
// processed (in the case of a crash).
this.logger.warning('Duplicate connection for %d.', entry.height);
} else if (entry.height !== this.state.tip.height + 1) { } else if (entry.height !== this.state.tip.height + 1) {
throw new Error('Bad connection (height mismatch).'); throw new Error('WDB: Bad connection (height mismatch).');
} }
tip = HeaderRecord.fromEntry(entry); tip = HeaderRecord.fromEntry(entry);
@ -1820,7 +1842,7 @@ WalletDB.prototype._addBlock = co(function* addBlock(entry, txs) {
} }
if (total > 0) { if (total > 0) {
this.logger.info('Connected block %s (tx=%d).', this.logger.info('Connected WalletDB block %s (tx=%d).',
utils.revHex(tip.hash), total); utils.revHex(tip.hash), total);
} }
@ -1851,27 +1873,24 @@ WalletDB.prototype.removeBlock = co(function* removeBlock(entry) {
*/ */
WalletDB.prototype._removeBlock = co(function* removeBlock(entry) { WalletDB.prototype._removeBlock = co(function* removeBlock(entry) {
var i, tx, tip, prev, block; var i, tx, prev, block;
if (entry.height > this.state.tip.height) { if (entry.height > this.state.tip.height) {
this.logger.warning('Wallet is disconnecting high blocks.'); this.logger.warning(
'WalletDB is disconnecting high blocks (%d).',
entry.height);
return 0; return 0;
} }
if (entry.height !== this.state.tip.height) if (entry.height !== this.state.tip.height)
throw new Error('Bad disconnection (height mismatch).'); throw new Error('WDB: Bad disconnection (height mismatch).');
tip = yield this.getHeader(entry.height);
if (!tip)
throw new Error('Bad disconnection (block not found).');
prev = yield this.getHeader(entry.height - 1); prev = yield this.getHeader(entry.height - 1);
if (!prev) if (!prev)
throw new Error('Bad disconnection (no previous block).'); throw new Error('WDB: Bad disconnection (no previous block).');
block = yield this.getBlockMap(tip.height); block = yield this.getBlockMap(entry.height);
if (!block) { if (!block) {
yield this.syncState(prev); yield this.syncState(prev);
@ -1880,13 +1899,13 @@ WalletDB.prototype._removeBlock = co(function* removeBlock(entry) {
for (i = block.txs.length - 1; i >= 0; i--) { for (i = block.txs.length - 1; i >= 0; i--) {
tx = block.txs[i]; tx = block.txs[i];
yield this._unconfirm(tx, tip); yield this._unconfirm(tx);
} }
yield this.syncState(prev); yield this.syncState(prev);
this.logger.warning('Disconnected block %s (tx=%d).', this.logger.warning('Disconnected wallet block %s (tx=%d).',
utils.revHex(tip.hash), block.txs.length); utils.revHex(entry.hash), block.txs.length);
return block.txs.length; return block.txs.length;
}); });
@ -1908,12 +1927,12 @@ WalletDB.prototype.addTX = co(function* addTX(tx) {
entry = yield this.getHeader(tx.height); entry = yield this.getHeader(tx.height);
if (!entry) if (!entry)
throw new Error('Inserting unconfirmed transaction.'); throw new Error('WDB: Inserting unconfirmed transaction.');
if (tx.block !== entry.hash) if (tx.block !== entry.hash)
throw new Error('Inserting unconfirmed transaction.'); throw new Error('WDB: Inserting unconfirmed transaction.');
this.logger.warning('Retroactively inserting confirmed transaction.'); this.logger.warning('WalletDB is inserting confirmed transaction.');
} }
return yield this._insert(tx); return yield this._insert(tx);
@ -1934,7 +1953,7 @@ WalletDB.prototype._insert = co(function* insert(tx, block) {
var result = false; var result = false;
var i, wids, wid, wallet; var i, wids, wid, wallet;
assert(!tx.mutable, 'Cannot add mutable TX to wallet.'); assert(!tx.mutable, 'WDB: Cannot add mutable TX.');
wids = yield this.getWalletsByInsert(tx); wids = yield this.getWalletsByInsert(tx);
@ -1942,7 +1961,7 @@ WalletDB.prototype._insert = co(function* insert(tx, block) {
return; return;
this.logger.info( this.logger.info(
'Incoming transaction for %d wallets (%s).', 'Incoming transaction for %d wallets in WalletDB (%s).',
wids.length, tx.rhash); wids.length, tx.rhash);
for (i = 0; i < wids.length; i++) { for (i = 0; i < wids.length; i++) {
@ -1953,7 +1972,7 @@ WalletDB.prototype._insert = co(function* insert(tx, block) {
if (yield wallet.add(tx, block)) { if (yield wallet.add(tx, block)) {
this.logger.info( this.logger.info(
'Added transaction to wallet: %s (%d).', 'Added transaction to wallet in WalletDB: %s (%d).',
wallet.id, wid); wallet.id, wid);
result = true; result = true;
} }