tx-by-height. update node and pool to use walletdb.

This commit is contained in:
Christopher Jeffrey 2016-03-02 15:20:39 -08:00
parent 689db5632e
commit ad8090dc7c
5 changed files with 238 additions and 194 deletions

View File

@ -100,13 +100,9 @@ Fullnode.prototype._init = function _init() {
// Emit events for any TX we see that's
// is relevant to one of our wallets.
this.on('tx', function(tx) {
return;
self.walletdb.ownTX(tx, function(err, input, output) {
if (err)
return self.emit('error', err);
if (input || output)
self.emit('wallet tx', tx, input || [], output || []);
self.walletdb.tx.addTX(tx, function(err, updated) {
if (updated)
self.emit('wallet tx', tx);
});
});
@ -134,10 +130,12 @@ Fullnode.prototype._init = function _init() {
// Handle forks by unconfirming txs
// in our wallets' tx pools.
this.chain.on('remove entry', function(entry) {
self.wallets.forEach(function(wallet) {
wallet.tx.getAll().forEach(function(tx) {
if (tx.block === entry.hash || tx.height >= entry.height)
wallet.tx.unconfirm(tx);
self.walletdb.tx.getHeightHashes(entry.height, function(err, txs) {
if (err)
return self.emit('error', err);
txs.forEach(function(tx) {
self.walletdb.tx.unconfirm(tx);
});
});
});
@ -186,11 +184,7 @@ Fullnode.prototype.createWallet = function createWallet(options, callback) {
assert(wallet);
utils.debug('Loaded wallet with id=%s address=%s',
wallet.getID(), wallet.getAddress());
self.wallets.push(wallet);
return callback(null, wallet);
wallet.id, wallet.getAddress());
self.pool.addWallet(wallet, function(err) {
if (err)

View File

@ -44,7 +44,6 @@ function Node(options) {
this.chain = null;
this.miner = null;
this.profiler = null;
this.wallets = [];
Node.global = this;
}

View File

@ -139,10 +139,6 @@ function Pool(node, options) {
timeout: options.invTimeout || 60000
};
// Added and watched wallets
options.wallets = options.wallets || [];
this.wallets = [];
Pool.global = this;
this.loading = true;
@ -230,7 +226,7 @@ Pool.prototype._init = function _init() {
self.resolveOrphan(self.peers.load, null, data.hash);
});
this.options.wallets.forEach(function(wallet) {
(this.options.wallets || []).forEach(function(wallet) {
self.addWallet(wallet);
});
@ -1226,13 +1222,8 @@ Pool.prototype.addWallet = function addWallet(wallet, callback) {
callback = utils.asyncify(callback);
if (this.options.spv) {
if (this.wallets.indexOf(wallet) !== -1)
return callback();
if (this.options.spv)
this.watchWallet(wallet);
this.wallets.push(wallet);
}
wallet.getPending(function(err, txs) {
if (err)
@ -1250,19 +1241,11 @@ Pool.prototype.addWallet = function addWallet(wallet, callback) {
};
Pool.prototype.removeWallet = function removeWallet(wallet) {
var i;
if (!this.options.spv)
return;
i = this.wallets.indexOf(wallet);
assert(!this.loading);
if (i == -1)
return;
this.wallets.splice(i, 1);
this.unwatchWallet(wallet);
};

View File

@ -116,7 +116,8 @@ TXPool.prototype._add = function add(tx, callback, force) {
// and remove pending flag to mark as confirmed.
if (existing.ts === 0 && tx.ts !== 0) {
batch.put(p + 't/t/' + hash, tx.toExtended());
batch.del(p + 'p/' + hash);
batch.put(p + 't/h/' + tx.height + '/' + hash, new Buffer([]));
batch.del(p + 'p/t/' + hash);
tx.inputs.forEach(function(input) {
var type = input.getType();
@ -401,7 +402,9 @@ TXPool.prototype._add = function add(tx, callback, force) {
batch.put(p + 't/t/' + hash, tx.toExtended());
if (tx.ts === 0)
batch.put(p + 'p/' + hash, new Buffer([]));
batch.put(p + 'p/t/' + hash, new Buffer([]));
else
batch.put(p + 't/h/' + tx.height + '/' + hash, new Buffer([]));
tx.getAddresses().forEach(function(address) {
batch.put(p + 't/a/' + address + '/' + hash, new Buffer([]));
@ -451,7 +454,9 @@ TXPool.prototype.remove = function remove(hash, callback) {
batch.del(p + 't/t/' + hash);
if (tx.ts === 0)
batch.del(p + 'p/' + hash);
batch.del(p + 'p/t/' + hash);
else
batch.del(p + 't/h/' + tx.height + '/' + hash);
self.fillTX(tx, function(err) {
if (err)
@ -465,9 +470,6 @@ TXPool.prototype.remove = function remove(hash, callback) {
if (input.isCoinbase())
return;
if (!input.output)
return;
if (type === 'pubkey' || type === 'multisig')
address = null;
@ -552,7 +554,7 @@ TXPool.prototype.unconfirm = function unconfirm(hash, callback) {
hash = hash.hash('hex');
this.getTX(hash, function(err, tx) {
var batch;
var batch, height;
if (err)
return callback(err);
@ -560,6 +562,7 @@ TXPool.prototype.unconfirm = function unconfirm(hash, callback) {
if (!tx)
return callback(null, true);
height = tx.height;
tx.height = -1;
tx.ps = utils.now();
tx.ts = 0;
@ -567,7 +570,8 @@ TXPool.prototype.unconfirm = function unconfirm(hash, callback) {
tx.block = null;
batch.put(p + 't/t/' + hash, tx.toExtended());
batch.put(p + 'p/' + hash, new Buffer([]));
batch.put(p + 'p/t/' + hash, new Buffer([]));
batch.del(p + 't/h/' + height + '/' + hash);
tx.inputs.forEach(function(input) {
var type = input.getType();
@ -644,6 +648,7 @@ TXPool.prototype.getTXHashes = function getTXHashes(address, callback) {
var p = this.prefix + '/';
var self = this;
var txs = [];
var iter;
callback = utils.ensure(callback);
@ -667,9 +672,197 @@ TXPool.prototype.getTXHashes = function getTXHashes(address, callback) {
});
}
var iter = this.db.db.iterator({
gte: p + 't/a/' + address,
lte: p + 't/a/' + address + '~',
iter = this.db.db.iterator({
gte: address ? p + 't/a/' + address : p + 't/t',
lte: address ? p + 't/a/' + address + '~' : p + 't/t~',
keys: true,
values: false,
fillCache: false,
keyAsBuffer: false
});
(function next() {
iter.next(function(err, key, value) {
if (err) {
return iter.end(function() {
callback(err);
});
}
if (key === undefined) {
return iter.end(function(err) {
if (err)
return callback(err);
return callback(null, txs);
});
}
if (address)
txs.push(key.split('/')[4]);
else
txs.push(key.split('/')[3]);
next();
});
})();
};
TXPool.prototype.getPendingHashes = function getPendingHashes(address, callback) {
var p = this.prefix + '/';
var self = this;
var txs = [];
var iter;
callback = utils.ensure(callback);
if (Array.isArray(address)) {
return utils.forEachSerial(address, function(address, next) {
assert(address);
self.getPendingHashes(address, function(err, tx) {
if (err)
return next(err);
txs = txs.concat(tx);
next();
});
}, function(err) {
if (err)
return callback(err);
txs = utils.uniqs(txs);
return callback(null, txs);
});
}
iter = this.db.db.iterator({
gte: address ? p + 'p/a/' + address : p + 'p/t',
lte: address ? p + 'p/a/' + address + '~' : p + 'p/t~',
keys: true,
values: false,
fillCache: false,
keyAsBuffer: false
});
(function next() {
iter.next(function(err, key, value) {
if (err) {
return iter.end(function() {
callback(err);
});
}
if (key === undefined) {
return iter.end(function(err) {
if (err)
return callback(err);
return callback(null, txs);
});
}
if (address)
txs.push(key.split('/')[4]);
else
txs.push(key.split('/')[3]);
next();
});
})();
};
TXPool.prototype.getCoinIDs = function getCoinIDs(address, callback) {
var p = this.prefix + '/';
var self = this;
var coins = [];
var iter;
callback = utils.ensure(callback);
if (Array.isArray(address)) {
return utils.forEachSerial(address, function(address, next) {
self.getCoinIDs(address, function(err, coin) {
if (err)
return next(err);
coins = coins.concat(coin);
next();
});
}, function(err) {
if (err)
return callback(err);
coins = utils.uniqs(coins);
return callback(null, coins);
});
}
iter = this.db.db.iterator({
gte: address ? p + 'u/a/' + address : p + 'u/t',
lte: address ? p + 'u/a/' + address + '~' : p + 'u/t~',
keys: true,
values: false,
fillCache: false,
keyAsBuffer: false
});
(function next() {
iter.next(function(err, key, value) {
if (err) {
return iter.end(function() {
callback(err);
});
}
if (key === undefined) {
return iter.end(function(err) {
if (err)
return callback(err);
return callback(null, coins);
});
}
if (address)
coins.push(key.split('/').slice(4).join('/'));
else
coins.push(key.split('/').slice(3).join('/'));
next();
});
})();
};
TXPool.prototype.getHeightHashes = function getHeightHashes(height, callback) {
var p = this.prefix + '/';
var self = this;
var txs = [];
var iter;
callback = utils.ensure(callback);
if (Array.isArray(height)) {
return utils.forEachSerial(height, function(height, next) {
self.getHeightHashes(height, function(err, tx) {
if (err)
return next(err);
txs = txs.concat(tx);
next();
});
}, function(err) {
if (err)
return callback(err);
return callback(null, txs);
});
}
iter = this.db.db.iterator({
gte: p + 't/h/' + height,
lte: p + 't/h/' + height + '~',
keys: true,
values: false,
fillCache: false,
@ -699,128 +892,6 @@ TXPool.prototype.getTXHashes = function getTXHashes(address, callback) {
})();
};
TXPool.prototype.getPendingHashes = function getPendingHashes(address, callback) {
var p = this.prefix + '/';
var self = this;
var txs = [];
callback = utils.ensure(callback);
if (Array.isArray(address)) {
return utils.forEachSerial(address, function(address, next) {
assert(address);
self.getPendingHashes(address, function(err, tx) {
if (err)
return next(err);
txs = txs.concat(tx);
next();
});
}, function(err) {
if (err)
return callback(err);
txs = utils.uniqs(txs);
return callback(null, txs);
});
}
var iter = this.db.db.iterator({
gte: address ? p + 'p/a/' + address : p + 'p',
lte: address ? p + 'p/a/' + address + '~' : p + 'p~',
keys: true,
values: false,
fillCache: false,
keyAsBuffer: false
});
(function next() {
iter.next(function(err, key, value) {
if (err) {
return iter.end(function() {
callback(err);
});
}
if (key === undefined) {
return iter.end(function(err) {
if (err)
return callback(err);
return callback(null, txs);
});
}
if (address)
txs.push(key.split('/')[4]);
else
txs.push(key.split('/')[2]);
next();
});
})();
};
TXPool.prototype.getCoinIDs = function getCoinIDs(address, callback) {
var p = this.prefix + '/';
var self = this;
var coins = [];
callback = utils.ensure(callback);
if (Array.isArray(address)) {
return utils.forEachSerial(address, function(address, next) {
self.getCoinIDs(address, function(err, coin) {
if (err)
return next(err);
coins = coins.concat(coin);
next();
});
}, function(err) {
if (err)
return callback(err);
coins = utils.uniqs(coins);
return callback(null, coins);
});
}
var iter = this.db.db.iterator({
gte: p + 'u/a/' + address,
lte: p + 'u/a/' + address + '~',
keys: true,
values: false,
fillCache: false,
keyAsBuffer: false
});
(function next() {
iter.next(function(err, key, value) {
if (err) {
return iter.end(function() {
callback(err);
});
}
if (key === undefined) {
return iter.end(function(err) {
if (err)
return callback(err);
return callback(null, coins);
});
}
coins.push(key.split('/').slice(4).join('/'));
next();
});
})();
};
TXPool.prototype.getTXByAddress = function getTXByAddress(address, callback) {
var self = this;
var txs = [];
@ -829,9 +900,6 @@ TXPool.prototype.getTXByAddress = function getTXByAddress(address, callback) {
if (err)
return callback(err);
if (!hashes.length)
return callback(null, hashes);
utils.forEachSerial(hashes, function(hash, next) {
self.getTX(hash, function(err, tx) {
if (err)
@ -860,7 +928,7 @@ TXPool.prototype.getLast = function getLast(address, callback) {
if (err)
return callback(err);
lastTs = -1;
lastTs = 0;
lastHeight = -1;
txs.forEach(function(tx) {
@ -871,9 +939,6 @@ TXPool.prototype.getLast = function getLast(address, callback) {
lastHeight = tx.height;
});
if (lastTs === -1)
lastTs = utils.now() - 2 * 7 * 24 * 60 * 60;
return callback(null, lastTs, lastHeight);
});
};
@ -885,9 +950,6 @@ TXPool.prototype.getPendingByAddress = function getPendingByAddress(address, cal
if (err)
return callback(err);
if (!hashes.length)
return callback(null, hashes);
utils.forEachSerial(hashes, function(hash, next) {
self.getTX(hash, function(err, tx) {
if (err)
@ -917,9 +979,6 @@ TXPool.prototype.getCoinByAddress = function getCoinByAddress(address, callback)
if (err)
return callback(err);
if (!ids.length)
return callback(null, ids);
utils.forEachSerial(ids, function(id, next) {
var parts = id.split('/');
self.getCoin(parts[0], +parts[1], function(err, coin) {
@ -1049,6 +1108,22 @@ TXPool.prototype.getCoin = function getCoin(hash, index, callback) {
});
};
TXPool.prototype.getAll = function getAll(callback) {
return this.getTXByAddress(null, callback);
};
TXPool.prototype.getUnspent = function getUnspent(callback) {
return this.getCoinByAddress(null, callback);
};
TXPool.prototype.getPending = function getPending(callback) {
return this.getPendingByAddress(null, callback);
};
TXPool.prototype.getBalance = function getBalance(callback) {
return this.getBalanceByAddress(null, callback);
};
TXPool.prototype.getAllByAddress = function getAllByAddress(address, callback) {
return this.getTXByAddress(address, callback);
};
@ -1057,7 +1132,7 @@ TXPool.prototype.getUnspentByAddress = function getUnspentByAddress(address, cal
return this.getCoinByAddress(address, callback);
};
TXPool.prototype.getPendingByAddress = function getPendingByAddress(address) {
TXPool.prototype.getPendingByAddress = function getPendingByAddress(address, callback) {
return this.getPendingByAddress(address, callback);
};

View File

@ -531,9 +531,11 @@ Wallet.prototype.tx = function _tx(options, outputs, callback) {
return callback(new Error('Incompatible locktime.'));
// Set the locktime to target value or
// `height - whatever` to avoid fee snipping.
// `height - whatever` to avoid fee sniping.
if (target.value > 0)
tx.setLocktime(target.value);
else if (options.locktime != null)
tx.setLocktime(options.locktime);
else
tx.avoidFeeSniping();
@ -541,16 +543,7 @@ Wallet.prototype.tx = function _tx(options, outputs, callback) {
if (!self.sign(tx))
return callback(new Error('Could not sign transaction.'));
if (self.m > 1)
return callback(null, tx);
// Add to pool
self.addTX(tx, function(err) {
if (err)
return callback(err);
callback(null, tx);
});
return callback(null, tx);
});
};