walletdb: sync state and utxo handling refactor.

This commit is contained in:
Christopher Jeffrey 2016-10-29 16:27:01 -07:00
parent b74c32cc4b
commit 3a89e627b8
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
8 changed files with 481 additions and 236 deletions

View File

@ -1244,17 +1244,17 @@ HTTPServer.prototype._initIO = function _initIO() {
callback();
});
socket.on('watch hash', function(args, callback) {
var hashes = args[0];
socket.on('watch data', function(args, callback) {
var chunks = args[0];
if (!Array.isArray(hashes))
if (!Array.isArray(chunks))
return callback({ error: 'Invalid parameter.' });
if (!socket.api)
return callback({ error: 'Not authorized.' });
try {
socket.addFilter(hashes);
socket.addFilter(chunks);
} catch (e) {
return callback({ error: e.message });
}
@ -1503,22 +1503,22 @@ ClientSocket.prototype._init = function _init() {
});
};
ClientSocket.prototype.addFilter = function addFilter(hashes) {
var i, hash;
ClientSocket.prototype.addFilter = function addFilter(chunks) {
var i, data;
if (!this.filter)
this.filter = Bloom.fromRate(100000, 0.001, -1);
for (i = 0; i < hashes.length; i++) {
hash = Address.getHash(hashes[i], 'hex');
for (i = 0; i < chunks.length; i++) {
data = chunks[i];
if (!hash)
throw new Error('Bad hash.');
if (!utils.isHex(data))
throw new Error('Not a hex string.');
this.filter.add(hash, 'hex');
this.filter.add(data, 'hex');
if (this.pool.options.spv)
this.pool.watch(hash, 'hex');
this.pool.watch(data, 'hex');
}
};
@ -1625,20 +1625,12 @@ ClientSocket.prototype.testFilterFull = function testFilterFull(tx) {
};
ClientSocket.prototype.testFilterSPV = function testFilterSPV(tx) {
var found = false;
var i, hash, input, prevout, output, outpoint;
if (!this.filter)
return false;
if (!tx.isCoinbase()) {
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
prevout = input.prevout;
if (this.filter.test(prevout.toRaw()))
return true;
}
}
for (i = 0; i < tx.outputs.length; i++) {
output = tx.outputs[i];
hash = output.getHash();
@ -1649,7 +1641,19 @@ ClientSocket.prototype.testFilterSPV = function testFilterSPV(tx) {
if (this.filter.test(hash)) {
outpoint = Outpoint.fromTX(tx, i);
this.filter.add(outpoint.toRaw());
return true;
found = true;
}
}
if (found)
return true;
if (!tx.isCoinbase()) {
for (i = 0; i < tx.inputs.length; i++) {
input = tx.inputs[i];
prevout = input.prevout;
if (this.filter.test(prevout.toRaw()))
return true;
}
}

View File

@ -278,12 +278,12 @@ FullNode.prototype._close = co(function* close() {
});
/**
* Watch address or tx hashes (nop).
* @param {Hash[]} hashes
* Watch address or outpoints (nop).
* @param {Hash[]} chunks
* @returns {Promise}
*/
FullNode.prototype.watchHash = function watchHash(hashes) {
FullNode.prototype.watchData = function watchData(chunks) {
return Promise.resolve();
};

View File

@ -195,18 +195,18 @@ SPVNode.prototype._close = co(function* close() {
});
/**
* Watch address hashes.
* @param {Hash[]} hashes
* Watch address hashes or outpoints.
* @param {Hash[]} chunks
* @returns {Promise}
*/
SPVNode.prototype.watchHash = function watchHash(hashes) {
SPVNode.prototype.watchData = function watchData(chunks) {
var i;
this.logger.info('Adding %d addresses to filter.', hashes.length);
this.logger.info('Adding %d addresses to filter.', chunks.length);
for (i = 0; i < hashes.length; i++)
this.pool.watch(hashes[i], 'hex');
for (i = 0; i < chunks.length; i++)
this.pool.watch(chunks[i], 'hex');
return Promise.resolve();
};

View File

@ -62,6 +62,12 @@ layout.walletdb = {
},
ee: function ee(key) {
return key.slice(1);
},
o: function o(hash) {
return 'o' + hash + pad32(index);
},
oo: function oo(key) {
return [key.slice(1, 65), +key.slice(65)];
}
};

View File

@ -13,47 +13,48 @@ var BufferReader = require('../utils/reader');
var BufferWriter = require('../utils/writer');
/**
* Wallet Tip
* Chain State
* @constructor
* @param {Hash} hash
* @param {Number} height
*/
function ChainState() {
if (!(this instanceof ChainState))
return new ChainState();
this.start = new HeaderRecord();
this.tip = new HeaderRecord();
this.startHeight = -1;
this.startHash = constants.NULL_HASH;
this.height = -1;
}
/**
* Clone the block.
* Clone the state.
* @returns {ChainState}
*/
ChainState.prototype.clone = function clone() {
var state = new ChainState();
state.start = this.start;
state.tip = this.tip;
state.startHeight = this.startHeight;
state.startHash = this.startHash;
state.height = this.height;
return state;
};
/**
* Instantiate wallet block from serialized tip data.
* Inject properties from serialized data.
* @private
* @param {Buffer} data
*/
ChainState.prototype.fromRaw = function fromRaw(data) {
var p = new BufferReader(data);
this.start.fromRaw(p);
this.tip.fromRaw(p);
this.startHeight = p.readU32();
this.startHash = p.readHash('hex');
this.height = p.readU32();
return this;
};
/**
* Instantiate wallet block from serialized data.
* Instantiate chain state from serialized data.
* @param {Hash} hash
* @param {Buffer} data
* @returns {ChainState}
@ -64,15 +65,16 @@ ChainState.fromRaw = function fromRaw(data) {
};
/**
* Serialize the wallet block as a tip (hash and height).
* Serialize the chain state.
* @returns {Buffer}
*/
ChainState.prototype.toRaw = function toRaw(writer) {
var p = new BufferWriter(writer);
this.start.toRaw(p);
this.tip.toRaw(p);
p.writeU32(this.startHeight);
p.writeHash(this.startHash);
p.writeU32(this.height);
if (!writer)
p = p.render();
@ -81,15 +83,16 @@ ChainState.prototype.toRaw = function toRaw(writer) {
};
/**
* Wallet Tip
* Block Meta
* @constructor
* @param {Hash} hash
* @param {Number} height
* @param {Number} ts
*/
function HeaderRecord(hash, height, ts) {
if (!(this instanceof HeaderRecord))
return new HeaderRecord(hash, height, ts);
function BlockMeta(hash, height, ts) {
if (!(this instanceof BlockMeta))
return new BlockMeta(hash, height, ts);
this.hash = hash || constants.NULL_HASH;
this.height = height != null ? height : -1;
@ -98,20 +101,29 @@ function HeaderRecord(hash, height, ts) {
/**
* Clone the block.
* @returns {HeaderRecord}
* @returns {BlockMeta}
*/
HeaderRecord.prototype.clone = function clone() {
return new HeaderRecord(this.hash, this.height, this.ts);
BlockMeta.prototype.clone = function clone() {
return new BlockMeta(this.hash, this.height, this.ts);
};
/**
* Instantiate wallet block from chain entry.
* Get block meta hash as a buffer.
* @returns {Buffer}
*/
BlockMeta.prototype.toHash = function toHash() {
return new Buffer(this.hash, 'hex');
};
/**
* Instantiate block meta from chain entry.
* @private
* @param {ChainEntry} entry
*/
HeaderRecord.prototype.fromEntry = function fromEntry(entry) {
BlockMeta.prototype.fromEntry = function fromEntry(entry) {
this.hash = entry.hash;
this.height = entry.height;
this.ts = entry.ts;
@ -119,12 +131,12 @@ HeaderRecord.prototype.fromEntry = function fromEntry(entry) {
};
/**
* Instantiate wallet block from json object.
* Instantiate block meta from json object.
* @private
* @param {Object} json
*/
HeaderRecord.prototype.fromJSON = function fromJSON(json) {
BlockMeta.prototype.fromJSON = function fromJSON(json) {
this.hash = utils.revHex(json.hash);
this.height = json.height;
this.ts = json.ts;
@ -132,12 +144,12 @@ HeaderRecord.prototype.fromJSON = function fromJSON(json) {
};
/**
* Instantiate wallet block from serialized tip data.
* Instantiate block meta from serialized tip data.
* @private
* @param {Buffer} data
*/
HeaderRecord.prototype.fromRaw = function fromRaw(data) {
BlockMeta.prototype.fromRaw = function fromRaw(data) {
var p = new BufferReader(data);
this.hash = p.readHash('hex');
this.height = p.readU32();
@ -146,42 +158,42 @@ HeaderRecord.prototype.fromRaw = function fromRaw(data) {
};
/**
* Instantiate wallet block from chain entry.
* Instantiate block meta from chain entry.
* @param {ChainEntry} entry
* @returns {HeaderRecord}
* @returns {BlockMeta}
*/
HeaderRecord.fromEntry = function fromEntry(entry) {
return new HeaderRecord().fromEntry(entry);
BlockMeta.fromEntry = function fromEntry(entry) {
return new BlockMeta().fromEntry(entry);
};
/**
* Instantiate wallet block from json object.
* Instantiate block meta from json object.
* @param {Object} json
* @returns {HeaderRecord}
* @returns {BlockMeta}
*/
HeaderRecord.fromJSON = function fromJSON(json) {
return new HeaderRecord().fromJSON(json);
BlockMeta.fromJSON = function fromJSON(json) {
return new BlockMeta().fromJSON(json);
};
/**
* Instantiate wallet block from serialized data.
* Instantiate block meta from serialized data.
* @param {Hash} hash
* @param {Buffer} data
* @returns {HeaderRecord}
* @returns {BlockMeta}
*/
HeaderRecord.fromRaw = function fromRaw(data) {
return new HeaderRecord().fromRaw(data);
BlockMeta.fromRaw = function fromRaw(data) {
return new BlockMeta().fromRaw(data);
};
/**
* Serialize the wallet block as a tip (hash and height).
* Serialize the block meta.
* @returns {Buffer}
*/
HeaderRecord.prototype.toRaw = function toRaw(writer) {
BlockMeta.prototype.toRaw = function toRaw(writer) {
var p = new BufferWriter(writer);
p.writeHash(this.hash);
@ -195,11 +207,11 @@ HeaderRecord.prototype.toRaw = function toRaw(writer) {
};
/**
* Convert the block to a more json-friendly object.
* Convert the block meta to a more json-friendly object.
* @returns {Object}
*/
HeaderRecord.prototype.toJSON = function toJSON() {
BlockMeta.prototype.toJSON = function toJSON() {
return {
hash: utils.revHex(this.hash),
height: this.height,
@ -363,6 +375,38 @@ TXMapRecord.fromRaw = function fromRaw(hash, data) {
return new TXMapRecord(hash).fromRaw(data);
};
/**
* Outpoint Map
* @constructor
*/
function OutpointMapRecord(hash, index, wids) {
this.hash = hash || constants.NULL_HASH;
this.index = index != null ? index : -1;
this.wids = wids || [];
}
OutpointMapRecord.prototype.add = function add(wid) {
return utils.binaryInsert(this.wids, wid, cmp, true) !== -1;
};
OutpointMapRecord.prototype.remove = function remove(wid) {
return utils.binaryRemove(this.wids, wid, cmp);
};
OutpointMapRecord.prototype.toRaw = function toRaw() {
return serializeWallets(this.wids);
};
OutpointMapRecord.prototype.fromRaw = function fromRaw(data) {
this.wids = parseWallets(data);
return this;
};
OutpointMapRecord.fromRaw = function fromRaw(hash, index, data) {
return new OutpointMapRecord(hash, index).fromRaw(data);
};
/**
* Path Record
* @constructor
@ -433,9 +477,10 @@ function serializeWallets(wids) {
*/
exports.ChainState = ChainState;
exports.HeaderRecord = HeaderRecord;
exports.BlockMeta = BlockMeta;
exports.BlockMapRecord = BlockMapRecord;
exports.TXMapRecord = TXMapRecord;
exports.OutpointMapRecord = OutpointMapRecord;
exports.PathMapRecord = PathMapRecord;
module.exports = exports;

View File

@ -20,6 +20,7 @@ var Outpoint = require('../primitives/outpoint');
var records = require('./records');
var BlockMapRecord = records.BlockMapRecord;
var TXMapRecord = records.TXMapRecord;
var OutpointMapRecord = records.OutpointMapRecord;
var DUMMY = new Buffer([0]);
/*
@ -687,14 +688,18 @@ TXDB.prototype.resolveOutputs = co(function* resolveOutputs(tx, block, resolved)
* @param {Path} path
*/
TXDB.prototype.saveCredit = function saveCredit(credit, path) {
var prevout = credit.coin;
var key = prevout.hash + prevout.index;
TXDB.prototype.saveCredit = co(function* saveCredit(credit, path) {
var coin = credit.coin;
var key = coin.hash + coin.index;
var raw = credit.toRaw();
this.put(layout.c(prevout.hash, prevout.index), raw);
this.put(layout.C(path.account, prevout.hash, prevout.index), DUMMY);
yield this.addOutpointMap(coin.hash, coin.index);
this.put(layout.c(coin.hash, coin.index), raw);
this.put(layout.C(path.account, coin.hash, coin.index), DUMMY);
this.coinCache.push(key, raw);
};
});
/**
* Remove credit.
@ -702,13 +707,17 @@ TXDB.prototype.saveCredit = function saveCredit(credit, path) {
* @param {Path} path
*/
TXDB.prototype.removeCredit = function removeCredit(credit, path) {
var prevout = credit.coin;
var key = prevout.hash + prevout.index;
this.del(layout.c(prevout.hash, prevout.index));
this.del(layout.C(path.account, prevout.hash, prevout.index));
TXDB.prototype.removeCredit = co(function* removeCredit(credit, path) {
var coin = credit.coin;
var key = coin.hash + coin.index;
yield this.removeOutpointMap(coin.hash, coin.index);
this.del(layout.c(coin.hash, coin.index));
this.del(layout.C(path.account, coin.hash, coin.index));
this.coinCache.unpush(key);
};
});
/**
* Spend credit.
@ -799,7 +808,7 @@ TXDB.prototype.resolveInput = co(function* resolveInput(tx, index, path) {
// it retroactively.
if (stx.height === -1) {
credit.spent = true;
this.saveCredit(credit, path);
yield this.saveCredit(credit, path);
if (tx.height !== -1)
this.pending.confirmed += credit.coin.value;
}
@ -884,7 +893,7 @@ TXDB.prototype.isSpent = function isSpent(hash, index) {
* @returns {Promise}
*/
TXDB.prototype.addTXRecord = co(function* addTXRecord(tx) {
TXDB.prototype.addTXMap = co(function* addTXMap(tx) {
var hash = tx.hash('hex');
var map = yield this.walletdb.getTXMap(hash);
@ -903,7 +912,7 @@ TXDB.prototype.addTXRecord = co(function* addTXRecord(tx) {
* @returns {Promise}
*/
TXDB.prototype.removeTXRecord = co(function* removeTXRecord(tx) {
TXDB.prototype.removeTXMap = co(function* removeTXMap(tx) {
var hash = tx.hash('hex');
var map = yield this.walletdb.getTXMap(hash);
@ -921,6 +930,49 @@ TXDB.prototype.removeTXRecord = co(function* removeTXRecord(tx) {
this.walletdb.writeTXMap(this.wallet, hash, map);
});
/**
* Append to the global unspent record.
* @param {Hash} hash
* @param {Number} index
* @returns {Promise}
*/
TXDB.prototype.addOutpointMap = co(function* addOutpointMap(hash, i) {
var map = yield this.walletdb.getOutpointMap(hash, i);
if (!map)
map = new OutpointMapRecord(hash, i);
if (!map.add(this.wallet.wid))
return;
this.walletdb.writeOutpointMap(this.wallet, hash, i, map);
});
/**
* Remove from the global unspent record.
* @param {Hash} hash
* @param {Number} index
* @returns {Promise}
*/
TXDB.prototype.removeOutpointMap = co(function* removeOutpointMap(hash, i) {
var map = yield this.walletdb.getOutpointMap(hash, i);
if (!map)
return;
if (!map.remove(this.wallet.wid))
return;
if (map.wids.length === 0) {
this.walletdb.unwriteOutpointMap(this.wallet, hash, i);
return;
}
this.walletdb.writeOutpointMap(this.wallet, hash, i, map);
});
/**
* Append to the global block record.
* @param {TX} tx
@ -928,7 +980,7 @@ TXDB.prototype.removeTXRecord = co(function* removeTXRecord(tx) {
* @returns {Promise}
*/
TXDB.prototype.addBlockRecord = co(function* addBlockRecord(tx, height) {
TXDB.prototype.addBlockMap = co(function* addBlockMap(tx, height) {
var hash = tx.hash('hex');
var block = yield this.walletdb.getBlockMap(height);
@ -948,7 +1000,7 @@ TXDB.prototype.addBlockRecord = co(function* addBlockRecord(tx, height) {
* @returns {Promise}
*/
TXDB.prototype.removeBlockRecord = co(function* removeBlockRecord(tx, height) {
TXDB.prototype.removeBlockMap = co(function* removeBlockMap(tx, height) {
var hash = tx.hash('hex');
var block = yield this.walletdb.getBlockMap(height);
@ -1181,7 +1233,7 @@ TXDB.prototype.insert = co(function* insert(tx, block) {
// possible to compare the on-chain
// state vs. the mempool state.
credit.spent = true;
this.saveCredit(credit, path);
yield this.saveCredit(credit, path);
} else {
// If the tx is mined, we can safely
// remove the coin being spent. This
@ -1189,7 +1241,7 @@ TXDB.prototype.insert = co(function* insert(tx, block) {
// coin so it can be reconnected
// later during a reorg.
this.pending.confirmed -= coin.value;
this.removeCredit(credit, path);
yield this.removeCredit(credit, path);
}
updated = true;
@ -1221,7 +1273,7 @@ TXDB.prototype.insert = co(function* insert(tx, block) {
if (tx.height !== -1)
this.pending.confirmed += output.value;
this.saveCredit(credit, path);
yield this.saveCredit(credit, path);
updated = true;
}
@ -1259,10 +1311,10 @@ TXDB.prototype.insert = co(function* insert(tx, block) {
this.put(layout.H(account, tx.height, hash), DUMMY);
}
yield this.addTXRecord(tx);
yield this.addTXMap(tx);
if (tx.height !== -1)
yield this.addBlockRecord(tx, tx.height);
yield this.addBlockMap(tx, tx.height);
// Update the transaction counter and
// commit the new state. This state will
@ -1380,7 +1432,7 @@ TXDB.prototype._confirm = co(function* confirm(tx, block) {
// been removed on-chain.
this.pending.confirmed -= coin.value;
this.removeCredit(credit, path);
yield this.removeCredit(credit, path);
}
}
@ -1411,7 +1463,7 @@ TXDB.prototype._confirm = co(function* confirm(tx, block) {
this.pending.confirmed += output.value;
this.saveCredit(credit, path);
yield this.saveCredit(credit, path);
}
// Remove the RBF index if we have one.
@ -1431,10 +1483,10 @@ TXDB.prototype._confirm = co(function* confirm(tx, block) {
this.put(layout.H(account, tx.height, hash), DUMMY);
}
yield this.addTXRecord(tx);
yield this.addTXMap(tx);
if (tx.height !== -1)
yield this.addBlockRecord(tx, tx.height);
yield this.addBlockMap(tx, tx.height);
// Commit the new state. The balance has updated.
this.put(layout.R, this.pending.commit());
@ -1511,7 +1563,7 @@ TXDB.prototype.erase = co(function* erase(tx) {
this.pending.confirmed += coin.value;
this.unspendCredit(tx, i);
this.saveCredit(credit, path);
yield this.saveCredit(credit, path);
}
}
@ -1534,7 +1586,7 @@ TXDB.prototype.erase = co(function* erase(tx) {
if (tx.height !== -1)
this.pending.confirmed -= output.value;
this.removeCredit(credit, path);
yield this.removeCredit(credit, path);
}
// Remove the RBF index if we have one.
@ -1563,10 +1615,10 @@ TXDB.prototype.erase = co(function* erase(tx) {
this.del(layout.H(account, tx.height, hash));
}
yield this.removeTXRecord(tx);
yield this.removeTXMap(tx);
if (tx.height !== -1)
yield this.removeBlockRecord(tx, tx.height);
yield this.removeBlockMap(tx, tx.height);
// Update the transaction counter
// and commit new state due to
@ -1701,7 +1753,7 @@ TXDB.prototype.disconnect = co(function* disconnect(tx) {
// Resave the credit and mark it
// as spent in the mempool instead.
credit.spent = true;
this.saveCredit(credit, path);
yield this.saveCredit(credit, path);
}
}
@ -1734,10 +1786,10 @@ TXDB.prototype.disconnect = co(function* disconnect(tx) {
this.pending.confirmed -= output.value;
this.saveCredit(credit, path);
yield this.saveCredit(credit, path);
}
yield this.removeBlockRecord(tx, height);
yield this.removeBlockMap(tx, height);
// We need to update the now-removed
// block properties and reindex due
@ -3152,7 +3204,7 @@ function Details(txdb, tx) {
this.wid = this.wallet.wid;
this.id = this.wallet.id;
this.chainHeight = txdb.walletdb.height;
this.chainHeight = txdb.walletdb.state.height;
this.hash = tx.hash('hex');
this.tx = tx;

View File

@ -23,12 +23,14 @@ var ldb = require('../db/ldb');
var Bloom = require('../utils/bloom');
var Logger = require('../node/logger');
var TX = require('../primitives/tx');
var Outpoint = require('../primitives/outpoint');
var records = require('./records');
var ChainState = records.ChainState;
var BlockMapRecord = records.BlockMapRecord;
var HeaderRecord = records.HeaderRecord;
var BlockMeta = records.BlockMeta;
var PathMapRecord = records.PathMapRecord;
var TXMapRecord = records.TXMapRecord;
var OutpointMapRecord = records.OutpointMapRecord;
var TXDB = require('./txdb');
var U32 = utils.U32;
@ -133,6 +135,16 @@ var layout = {
},
ee: function ee(key) {
return key.toString('hex', 1);
},
o: function o(hash, index) {
var key = new Buffer(37);
key[0] = 0x01;
key.write(hash, 1, 'hex');
key.writeUInt32BE(index, 33, true);
return key;
},
oo: function oo(key) {
return [key.toString('hex', 1, 33), key.readUInt32BE(33, true)];
}
};
@ -169,7 +181,6 @@ function WalletDB(options) {
this.state = new ChainState();
this.depth = 0;
this.wallets = {};
this.genesis = HeaderRecord.fromEntry(this.network.genesis);
this.keepBlocks = this.network.block.keepBlocks;
// We need one read lock for `get` and `create`.
@ -201,10 +212,6 @@ function WalletDB(options) {
utils.inherits(WalletDB, AsyncObject);
WalletDB.prototype.__defineGetter__('height', function() {
return this.state.tip.height;
});
/**
* Database layout.
* @type {Object}
@ -235,8 +242,8 @@ WalletDB.prototype._open = co(function* open() {
this.logger.info(
'WalletDB loaded (depth=%d, height=%d, start=%d).',
this.depth,
this.state.tip.height,
this.state.start.height);
this.state.height,
this.state.startHeight);
});
/**
@ -265,11 +272,11 @@ WalletDB.prototype._close = co(function* close() {
*/
WalletDB.prototype.watch = co(function* watch() {
var hashes = yield this.getFilterHashes();
var data = yield this.getFilterData();
this.logger.info('Adding %d hashes to WalletDB filter.', hashes.length);
this.logger.info('Adding %d hashes to WalletDB filter.', data.length);
this.addFilter(hashes);
this.addFilter(data);
});
/**
@ -293,14 +300,14 @@ WalletDB.prototype.sync = co(function* sync() {
*/
WalletDB.prototype._sync = co(function* connect() {
var height = this.state.tip.height;
var height = this.state.height;
var tip, entry;
if (!this.client)
return;
while (height >= 0) {
tip = yield this.getHeader(height);
tip = yield this.getBlock(height);
if (!tip)
break;
@ -314,8 +321,8 @@ WalletDB.prototype._sync = co(function* connect() {
}
if (!entry) {
height = this.state.start.height;
entry = yield this.client.getEntry(this.state.start.hash);
height = this.state.startHeight;
entry = yield this.client.getEntry(this.state.startHash);
if (!entry)
height = 0;
@ -361,39 +368,45 @@ WalletDB.prototype._rescan = co(function* rescan(height) {
WalletDB.prototype.scan = co(function* scan(height) {
var self = this;
var tip;
if (!this.client)
return;
if (height == null)
height = this.state.startHeight;
assert(utils.isUInt32(height), 'WDB: Must pass in a height.');
if (height > this.state.tip.height)
if (height > this.state.height)
throw new Error('WDB: Cannot rescan future blocks.');
yield this.rollback(height);
this.logger.info(
'WalletDB is scanning %d blocks.',
this.state.tip.height - height + 1);
this.state.height - height + 1);
yield this.client.scan(this.state.tip.hash, this.filter, function(block, txs) {
tip = yield this.getTip();
yield this.client.scan(tip.hash, this.filter, function(block, txs) {
return self._addBlock(block, txs);
});
});
/**
* Add address or tx hashes to chain server filter.
* @param {Hashes[]} hashes
* Add address or outpoints to chain server filter.
* @param {Hashes[]} chunks
* @returns {Promise}
*/
WalletDB.prototype.watchHash = co(function* watchHash(hashes) {
WalletDB.prototype.watchData = co(function* watchData(chunks) {
if (!this.client) {
this.emit('watch hash', hashes);
this.emit('watch data', chunks);
return;
}
yield this.client.watchHash(hashes);
yield this.client.watchData(chunks);
});
/**
@ -630,8 +643,8 @@ WalletDB.prototype.commit = co(function* commit(wallet) {
* @returns {Boolean}
*/
WalletDB.prototype.testFilter = function testFilter(hash) {
return this.filter.test(hash, 'hex');
WalletDB.prototype.testFilter = function testFilter(data) {
return this.filter.test(data, 'hex');
};
/**
@ -640,18 +653,18 @@ WalletDB.prototype.testFilter = function testFilter(hash) {
* @param {Hash} hash
*/
WalletDB.prototype.addFilter = function addFilter(hashes) {
var i, hash;
WalletDB.prototype.addFilter = function addFilter(chunks) {
var i, data;
if (!Array.isArray(hashes))
hashes = [hashes];
if (!Array.isArray(chunks))
chunks = [chunks];
if (this.client)
this.client.watchHash(hashes);
this.client.watchData(chunks);
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
this.filter.add(hash, 'hex');
for (i = 0; i < chunks.length; i++) {
data = chunks[i];
this.filter.add(data, 'hex');
}
};
@ -1172,28 +1185,44 @@ WalletDB.prototype.getTXHashes = function getTXHashes() {
});
};
/**
* Get all tx hashes.
* @returns {Promise}
*/
WalletDB.prototype.getOutpoints = function getOutpoints() {
return this.db.keys({
gte: layout.o(constants.NULL_HASH, 0),
lte: layout.o(constants.HIGH_HASH, 0xffffffff),
parse: function(key) {
var items = layout.oo(key);
return new Outpoint(items[0], items[1]);
}
});
};
/**
* Get hashes required for rescan filter.
* @returns {Promise}
*/
WalletDB.prototype.getFilterHashes = co(function* getFilterHashes() {
var hashes = [];
var addr = yield this.getHashes();
var tx = yield this.getTXHashes();
var i, hash;
WalletDB.prototype.getFilterData = co(function* getFilterData() {
var chunks = [];
var hashes = yield this.getHashes();
var unspent = yield this.getOutpoints();
var i, hash, outpoint;
for (i = 0; i < addr.length; i++) {
hash = addr[i];
hashes.push(hash);
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
chunks.push(hash);
}
for (i = 0; i < tx.length; i++) {
hash = tx[i];
hashes.push(hash);
for (i = 0; i < unspent.length; i++) {
outpoint = unspent[i];
chunks.push(outpoint.toRaw());
}
return hashes;
return chunks;
});
/**
@ -1485,10 +1514,10 @@ WalletDB.prototype.getWalletsByInsert = co(function* getWalletsByInsert(tx) {
input = tx.inputs[i];
prevout = input.prevout;
if (!this.testFilter(prevout.hash))
if (!this.testFilter(prevout.toRaw()))
continue;
map = yield this.getTXMap(prevout.hash);
map = yield this.getOutpointMap(prevout.hash, prevout.index);
if (!map)
continue;
@ -1535,16 +1564,16 @@ WalletDB.prototype.init = co(function* init() {
if (this.client) {
tip = yield this.client.getTip();
assert(tip);
tip = HeaderRecord.fromEntry(tip);
tip = BlockMeta.fromEntry(tip);
} else {
tip = this.genesis;
tip = BlockMeta.fromEntry(this.network.genesis);
}
this.logger.info(
'Initializing WalletDB chain state at %s (%d).',
utils.revHex(tip.hash), tip.height);
yield this.syncState(tip, true);
yield this.resetState(tip);
});
/**
@ -1563,40 +1592,85 @@ WalletDB.prototype.getState = co(function* getState() {
/**
* Write the connecting block immediately.
* @param {HeaderRecord} tip
* @param {BlockMeta} tip
* @returns {Promise}
*/
WalletDB.prototype.syncState = co(function* syncState(tip, start) {
WalletDB.prototype.resetState = co(function* resetState(tip) {
var batch = this.db.batch();
var state = this.state.clone();
var height = this.state.tip.height;
var i, blocks;
var iter, item;
if (start)
state.start = tip;
iter = this.db.iterator({
gte: layout.c(0),
lte: layout.c(0xffffffff),
values: false
});
state.tip = tip;
for (;;) {
item = yield iter.next();
// Blocks ahead of our new tip that we need to delete.
if (height !== -1) {
blocks = height - tip.height;
if (blocks > 0) {
blocks = Math.min(blocks, this.keepBlocks);
for (i = 0; i < blocks; i++) {
batch.del(layout.c(height));
height--;
}
if (!item)
break;
try {
batch.del(item.key);
} catch (e) {
yield iter.end();
throw e;
}
}
// Prune old blocks.
height = tip.height - this.keepBlocks;
if (height >= 0)
batch.del(layout.c(height));
state.startHeight = tip.height;
state.startHash = tip.hash;
state.height = tip.height;
batch.put(layout.c(tip.height), tip.toHash());
batch.put(layout.R, state.toRaw());
yield batch.write();
this.state = state;
});
/**
* Write the connecting block immediately.
* @param {BlockMeta} tip
* @returns {Promise}
*/
WalletDB.prototype.syncState = co(function* syncState(tip) {
var batch = this.db.batch();
var state = this.state.clone();
var i, state, height, blocks;
if (tip.height < state.height) {
// Hashes ahead of our new tip
// that we need to delete.
height = state.height;
blocks = height - tip.height;
if (blocks > this.keepBlocks)
blocks = this.keepBlocks;
for (i = 0; i < blocks; i++) {
batch.del(layout.c(height));
height--;
}
} else if (tip.height > state.height) {
// Prune old hashes.
assert(tip.height === state.height + 1, 'Bad chain sync.');
height = tip.height - this.keepBlocks;
if (height >= 0)
batch.del(layout.c(height));
}
state.height = tip.height;
// Save tip and state.
batch.put(layout.c(tip.height), tip.toRaw());
batch.put(layout.c(tip.height), tip.toHash());
batch.put(layout.R, state.toRaw());
yield batch.write();
@ -1629,7 +1703,7 @@ WalletDB.prototype.unwriteBlockMap = function unwriteBlockMap(wallet, height) {
};
/**
* Connect a transaction.
* Add a transaction to global tx map.
* @param {Wallet} wallet
* @param {Hash} hash
* @param {TXMapRecord} map
@ -1643,7 +1717,7 @@ WalletDB.prototype.writeTXMap = function writeTXMap(wallet, hash, map) {
};
/**
* Connect a transaction.
* Remove a transaction from global tx map.
* @param {Wallet} wallet
* @param {Hash} hash
* @returns {Promise}
@ -1654,6 +1728,34 @@ WalletDB.prototype.unwriteTXMap = function unwriteTXMap(wallet, hash) {
batch.del(layout.e(hash));
};
/**
* Add an outpoint to global unspent map.
* @param {Wallet} wallet
* @param {Hash} hash
* @param {Number} index
* @param {OutpointMapRecord} map
* @returns {Promise}
*/
WalletDB.prototype.writeOutpointMap = function writeOutpointMap(wallet, hash, i, map) {
var batch = this.batch(wallet);
batch.put(layout.o(hash, i), map.toRaw());
this.addFilter(new Outpoint(hash, i).toRaw());
};
/**
* Remove an outpoint from global unspent map.
* @param {Wallet} wallet
* @param {Hash} hash
* @param {Number} index
* @returns {Promise}
*/
WalletDB.prototype.unwriteOutpointMap = function unwriteOutpointMap(wallet, hash, i) {
var batch = this.batch(wallet);
batch.del(layout.o(hash, i));
};
/**
* Get a wallet block (with hashes).
* @param {Hash} hash
@ -1670,18 +1772,38 @@ WalletDB.prototype.getBlockMap = co(function* getBlockMap(height) {
});
/**
* Get a wallet block (with hashes).
* Get a wallet block meta.
* @param {Hash} hash
* @returns {Promise}
*/
WalletDB.prototype.getHeader = co(function* getHeader(height) {
WalletDB.prototype.getBlock = co(function* getBlock(height) {
var data = yield this.db.get(layout.c(height));
var block;
if (!data)
return;
return HeaderRecord.fromRaw(data);
block = new BlockMeta();
block.hash = data.toString('hex');
block.height = height;
return block;
});
/**
* Get wallet tip.
* @param {Hash} hash
* @returns {Promise}
*/
WalletDB.prototype.getTip = co(function* getTip() {
var tip = yield this.getBlock(this.state.height);
if (!tip)
throw new Error('WDB: Tip not found!');
return tip;
});
/**
@ -1699,6 +1821,22 @@ WalletDB.prototype.getTXMap = co(function* getTXMap(hash) {
return TXMapRecord.fromRaw(hash, data);
});
/**
* Get a Unspent->Wallet map.
* @param {Hash} hash
* @param {Number} index
* @returns {Promise}
*/
WalletDB.prototype.getOutpointMap = co(function* getOutpointMap(hash, i) {
var data = yield this.db.get(layout.o(hash, i));
if (!data)
return;
return OutpointMapRecord.fromRaw(hash, i, data);
});
/**
* Sync with chain height.
* @param {Number} height
@ -1706,40 +1844,41 @@ WalletDB.prototype.getTXMap = co(function* getTXMap(hash) {
*/
WalletDB.prototype.rollback = co(function* rollback(height) {
var tip, blocks;
var tip;
if (this.state.tip.height <= height)
if (this.state.height <= height)
return;
this.logger.info(
'Rolling back %d WalletDB blocks to height %d.',
this.state.tip.height - height, height);
this.state.height - height, height);
tip = yield this.getHeader(height);
if (!tip) {
blocks = this.state.tip.height - height;
if (blocks < this.keepBlocks)
throw new Error('WDB: Block not found for rollback.');
if (height >= this.state.start.height) {
yield this.revert(this.state.start.height);
yield this.syncState(this.state.start, true);
this.logger.warning(
'WalletDB rolled back to start block (%d).',
this.state.tip.height);
} else {
yield this.revert(0);
yield this.syncState(this.genesis, true);
this.logger.warning('WalletDB rolled back to genesis block.');
}
tip = yield this.getBlock(height);
if (tip) {
yield this.revert(tip.height);
yield this.syncState(tip);
return;
}
yield this.revert(height);
yield this.syncState(tip);
tip = new BlockMeta();
if (height >= this.state.startHeight) {
tip.height = this.state.startHeight;
tip.hash = this.state.startHash;
this.logger.warning(
'Rolling back WalletDB to start block (%d).',
this.state.tip.height);
} else {
tip.height = 0;
tip.hash = this.network.genesis.hash;
this.logger.warning('Rolling back WalletDB to genesis block.');
}
yield this.revert(tip.height);
yield this.resetState(tip);
});
/**
@ -1804,35 +1943,34 @@ WalletDB.prototype.addBlock = co(function* addBlock(entry, txs) {
*/
WalletDB.prototype._addBlock = co(function* addBlock(entry, txs) {
var tip = BlockMeta.fromEntry(entry);
var total = 0;
var i, tip, tx;
var i, tx;
if (entry.height < this.state.tip.height) {
if (tip.height < this.state.height) {
this.logger.warning(
'WalletDB is connecting low blocks (%d).',
entry.height);
tip.height);
return total;
}
if (entry.height === this.state.tip.height) {
if (tip.height === this.state.height) {
// We let blocks of the same height
// through specifically for rescans:
// we always want to rescan the last
// block since the state may have
// updated before the block was fully
// processed (in the case of a crash).
this.logger.warning('Duplicate connection for %d.', entry.height);
} else if (entry.height !== this.state.tip.height + 1) {
this.logger.warning('Already saw WalletDB block (%d).', tip.height);
} else if (tip.height !== this.state.height + 1) {
throw new Error('WDB: Bad connection (height mismatch).');
}
tip = HeaderRecord.fromEntry(entry);
yield this.syncState(tip);
if (this.options.useCheckpoints) {
if (tip.height <= this.network.checkpoints.lastHeight)
return 0;
return total;
}
for (i = 0; i < txs.length; i++) {
@ -1873,24 +2011,25 @@ WalletDB.prototype.removeBlock = co(function* removeBlock(entry) {
*/
WalletDB.prototype._removeBlock = co(function* removeBlock(entry) {
var tip = BlockMeta.fromEntry(entry);
var i, tx, prev, block;
if (entry.height > this.state.tip.height) {
if (tip.height > this.state.height) {
this.logger.warning(
'WalletDB is disconnecting high blocks (%d).',
entry.height);
tip.height);
return 0;
}
if (entry.height !== this.state.tip.height)
if (tip.height !== this.state.height)
throw new Error('WDB: Bad disconnection (height mismatch).');
prev = yield this.getHeader(entry.height - 1);
prev = yield this.getBlock(tip.height - 1);
if (!prev)
throw new Error('WDB: Bad disconnection (no previous block).');
block = yield this.getBlockMap(entry.height);
block = yield this.getBlockMap(tip.height);
if (!block) {
yield this.syncState(prev);
@ -1905,7 +2044,7 @@ WalletDB.prototype._removeBlock = co(function* removeBlock(entry) {
yield this.syncState(prev);
this.logger.warning('Disconnected wallet block %s (tx=%d).',
utils.revHex(entry.hash), block.txs.length);
utils.revHex(tip.hash), block.txs.length);
return block.txs.length;
});
@ -1920,17 +2059,17 @@ WalletDB.prototype._removeBlock = co(function* removeBlock(entry) {
WalletDB.prototype.addTX = co(function* addTX(tx) {
var unlock = yield this.txLock.lock();
var entry;
var block;
try {
if (tx.height !== -1) {
entry = yield this.getHeader(tx.height);
block = yield this.getBlock(tx.height);
if (!entry)
throw new Error('WDB: Inserting unconfirmed transaction.');
if (!block)
throw new Error('WDB: Inserting confirmed transaction.');
if (tx.block !== entry.hash)
throw new Error('WDB: Inserting unconfirmed transaction.');
if (tx.block !== block.hash)
throw new Error('WDB: Inserting confirmed transaction.');
this.logger.warning('WalletDB is inserting confirmed transaction.');
}
@ -1945,7 +2084,7 @@ WalletDB.prototype.addTX = co(function* addTX(tx) {
* Add a transaction to the database without a lock.
* @private
* @param {TX} tx
* @param {HeaderRecord} block
* @param {BlockMeta} block
* @returns {Promise}
*/
@ -1989,7 +2128,7 @@ WalletDB.prototype._insert = co(function* insert(tx, block) {
* relevant wallets without a lock.
* @private
* @param {TXHash} hash
* @param {HeaderRecord} block
* @param {BlockMeta} block
* @returns {Promise}
*/

View File

@ -128,7 +128,7 @@ describe('Chain', function() {
it('should handle a reorg', cob(function* () {
var entry, block, forked;
assert.equal(walletdb.height, chain.height);
assert.equal(walletdb.state.height, chain.height);
assert.equal(chain.height, 10);
entry = yield chain.db.get(tip2.hash);
@ -227,8 +227,7 @@ describe('Chain', function() {
assert(wallet.account.receiveDepth >= 8);
assert(wallet.account.changeDepth >= 7);
assert.equal(walletdb.height, chain.height);
assert.equal(walletdb.state.tip.hash, chain.tip.hash);
assert.equal(walletdb.state.height, chain.height);
txs = yield wallet.getHistory();
assert.equal(txs.length, 44);